You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:03 UTC

[02/38] qpid-proton git commit: PROTON-881: Initial commit of proton-j reactor implementation

PROTON-881: Initial commit of proton-j reactor implementation


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e0187017
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e0187017
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e0187017

Branch: refs/heads/master
Commit: e0187017456a4df58df2f1d04b1941d99eacbe10
Parents: cb4f9b9
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri Apr 17 14:33:11 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:22:27 2015 +0100

----------------------------------------------------------------------
 examples/java/reactor/.gitignore                |   1 +
 .../proton/example/reactor/CountRandomly.java   | 104 +++++
 .../qpid/proton/example/reactor/Counter.java    |  81 ++++
 .../qpid/proton/example/reactor/Delegates.java  |  68 +++
 .../example/reactor/EchoInputStreamWrapper.java |  76 ++++
 .../proton/example/reactor/GlobalLogger.java    | 114 +++++
 .../proton/example/reactor/GoodbyeWorld.java    |  62 +++
 .../qpid/proton/example/reactor/HelloWorld.java |  60 +++
 .../proton/example/reactor/ReactorLogger.java   | 103 +++++
 .../qpid/proton/example/reactor/Scheduling.java |  60 +++
 .../qpid/proton/example/reactor/Unhandled.java  |  46 ++
 .../java/org/apache/qpid/proton/Proton.java     |  20 +-
 .../apache/qpid/proton/engine/BaseHandler.java  |  28 ++
 .../apache/qpid/proton/engine/Collector.java    |   1 +
 .../apache/qpid/proton/engine/Connection.java   |   4 +-
 .../org/apache/qpid/proton/engine/Event.java    |  10 +
 .../org/apache/qpid/proton/engine/Handler.java  |  18 +
 .../qpid/proton/engine/HandlerEndpoint.java     |  28 ++
 .../org/apache/qpid/proton/engine/Link.java     |   4 +-
 .../org/apache/qpid/proton/engine/Session.java  |   2 +-
 .../qpid/proton/engine/impl/CollectorImpl.java  |  10 +-
 .../qpid/proton/engine/impl/ConnectionImpl.java |   2 +-
 .../qpid/proton/engine/impl/EventImpl.java      | 103 ++++-
 .../proton/engine/impl/HandlerEndpointImpl.java |  44 ++
 .../qpid/proton/engine/impl/LinkImpl.java       |  28 +-
 .../qpid/proton/engine/impl/SessionImpl.java    |  14 +-
 .../qpid/proton/engine/impl/TransportImpl.java  |  25 ++
 .../org/apache/qpid/proton/reactor/Reactor.java |  99 +++++
 .../apache/qpid/proton/reactor/Selectable.java  | 113 +++++
 .../apache/qpid/proton/reactor/Selector.java    |  43 ++
 .../org/apache/qpid/proton/reactor/Task.java    |  35 ++
 .../qpid/proton/reactor/impl/IOHandler.java     | 333 ++++++++++++++
 .../qpid/proton/reactor/impl/ReactorImpl.java   | 445 +++++++++++++++++++
 .../proton/reactor/impl/SelectableImpl.java     | 272 ++++++++++++
 .../qpid/proton/reactor/impl/SelectorImpl.java  | 137 ++++++
 .../qpid/proton/reactor/impl/TaskImpl.java      |  83 ++++
 .../apache/qpid/proton/reactor/impl/Timer.java  |  70 +++
 37 files changed, 2725 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/.gitignore
----------------------------------------------------------------------
diff --git a/examples/java/reactor/.gitignore b/examples/java/reactor/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/examples/java/reactor/.gitignore
@@ -0,0 +1 @@
+/bin

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
new file mode 100644
index 0000000..0dcdf4a
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Let's try to modify our counter example. In addition to counting to
+// 10 in quarter second intervals, let's also print out a random number
+// every half second. This is not a super easy thing to express in a
+// purely sequential program, but not so difficult using events.
+public class CountRandomly extends BaseHandler {
+
+    private long startTime;
+    private CounterHandler counter;
+
+    class CounterHandler extends BaseHandler {
+        private final int limit;
+        private int count;
+
+        CounterHandler(int limit) {
+            this.limit = limit;
+        }
+
+        @Override
+        public void onTimerTask(Event event) {
+            count += 1;
+            System.out.println(count);
+
+            if (!done()) {
+                event.getReactor().schedule(250, this);
+            }
+        }
+
+        private boolean done() {
+            return count >= limit;
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+
+        // Save the counter instance in an attribute so we can refer to
+        // it later.
+        counter = new CounterHandler(10);
+        event.getReactor().schedule(250, counter);
+
+        // Now schedule another event with a different handler. Note
+        // that the timer tasks go to separate handlers, and they don't
+        // interfere with each other.
+        event.getReactor().schedule(500, this);
+    }
+
+    @Override
+    public void onTimerTask(Event event) {
+        // keep on shouting until we are done counting
+        System.out.println("Yay, " + Math.round(Math.abs((Math.random() * 110) - 10)));
+        if (!counter.done()) {
+            event.getReactor().schedule(500, this);
+        }
+    }
+
+    @Override
+    public void onReactorFinal(Event event) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        // In HelloWorld.java we said the reactor exits when there are no more
+        // events to process. While this is true, it's not actually complete.
+        // The reactor exits when there are no more events to process and no
+        // possibility of future events arising. For that reason the reactor
+        // will keep running until there are no more scheduled events and then
+        // exit.
+        Reactor reactor = Proton.reactor(new CountRandomly());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
new file mode 100644
index 0000000..a34038e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Counter extends BaseHandler {
+
+    private long startTime;
+
+    class CounterHandler extends BaseHandler {
+        private final int limit;
+        private int count;
+
+        CounterHandler(int limit) {
+            this.limit = limit;
+        }
+
+        @Override
+        public void onTimerTask(Event event) {
+            count += 1;
+            System.out.println(count);
+            if (count < limit) {
+                event.getReactor().schedule(250, this);
+            }
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+
+        // Note that unlike the previous scheduling example, we pass in
+        // a separate object for the handler. This means that the timer
+        // event we just scheduled will not be seen by Program as it is
+        // being handled by the Counter instance we create.
+        event.getReactor().schedule(250, new CounterHandler(10));
+    }
+
+    @Override
+    public void onReactorFinal(Event event) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        // In HelloWorld.java we said the reactor exits when there are no more
+        // events to process. While this is true, it's not actually complete.
+        // The reactor exits when there are no more events to process and no
+        // possibility of future events arising. For that reason the reactor
+        // will keep running until there are no more scheduled events and then
+        // exit.
+        Reactor reactor = Proton.reactor(new Counter());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
new file mode 100644
index 0000000..7b4e36f
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Events know how to dispatch themselves to handlers. By combining
+// this with on_unhandled, you can provide a kind of inheritance
+/// between handlers using delegation.
+public class Delegates extends BaseHandler {
+
+    private final Handler[] handlers;
+
+    static class Hello extends BaseHandler {
+        @Override
+        public void onReactorInit(Event e) {
+            System.out.println("Hello, World!");
+        }
+    }
+
+    static class Goodbye extends BaseHandler {
+        @Override
+        public void onReactorFinal(Event e) {
+            System.out.println("Goodbye, World!");
+        }
+    }
+
+    public Delegates(Handler... handlers) {
+        this.handlers = handlers;
+    }
+
+    @Override
+    public void onUnhandled(Event event) {
+        for (Handler handler : handlers) {
+            event.dispatch(handler);
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Delegates(new Hello(), new Goodbye()));
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
new file mode 100644
index 0000000..2e53d09
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
+import java.nio.channels.Pipe.SourceChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EchoInputStreamWrapper extends Thread {
+
+    private final InputStream in;
+    private final SinkChannel out;
+    private final byte[] bufferBytes = new byte[1024];
+    private final ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+    private final AtomicInteger idCounter = new AtomicInteger();
+
+    private EchoInputStreamWrapper(InputStream in, SinkChannel out) {
+        this.in = in;
+        this.out = out;
+        setName(getClass().getName() + "-" + idCounter.incrementAndGet());
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(true) {
+                int amount = in.read(bufferBytes);
+                if (amount < 0) break;
+                buffer.position(0);
+                buffer.limit(amount);
+                out.write(buffer);
+            }
+        } catch(IOException ioException) {
+            ioException.printStackTrace();
+        } finally {
+            try {
+                out.close();
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+            }
+        }
+    }
+
+    public static SourceChannel wrap(InputStream in) throws IOException {
+        Pipe pipe = Pipe.open();
+        new EchoInputStreamWrapper(in, pipe.sink()).start();
+        SourceChannel result = pipe.source();
+        result.configureBlocking(false);
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
new file mode 100644
index 0000000..1bb3d3e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+/*
+# Not every event goes to the reactor's event handler. If we have a
+# separate handler for something like a scheduled task, then those
+# events aren't logged by the logger associated with the reactor's
+# handler. Sometimes this is useful if you don't want to see them, but
+# sometimes you want the global picture.
+
+class Logger:
+
+    def on_unhandled(self, name, event):
+        print "LOG:", name, event
+
+class Task:
+
+    def on_timer_task(self, event):
+        print "Mission accomplished!"
+
+class Program:
+
+    def on_reactor_init(self, event):
+        print "Hello, World!"
+        event.reactor.schedule(0, Task())
+
+    def on_reactor_final(self, event):
+        print "Goodbye, World!"
+
+r = Reactor(Program())
+
+# In addition to having a regular handler, the reactor also has a
+# global handler that sees every event. By adding the Logger to the
+# global handler instead of the regular handler, we can log every
+# single event that occurs in the system regardless of whether or not
+# there are specific handlers associated with the objects that are the
+# target of those events.
+r.global_handler.add(Logger())
+r.run()
+
+ */
+
+// Not every event goes to the reactor's event handler. If we have a
+// separate handler for something like a scheduled task, then those
+// events aren't logged by the logger associated with the reactor's
+// handler. Sometimes this is useful if you don't want to see them, but
+// sometimes you want the global picture.
+public class GlobalLogger extends BaseHandler {
+
+    static class Logger extends BaseHandler {
+        @Override
+        public void onUnhandled(Event event) {
+            System.out.println("LOG: " + event);
+        }
+    }
+
+    static class Task extends BaseHandler {
+        @Override
+        public void onTimerTask(Event e) {
+            System.out.println("Mission accomplished!");
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+        event.getReactor().schedule(0, new Task());
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new GlobalLogger());
+
+        // In addition to having a regular handler, the reactor also has a
+        // global handler that sees every event. By adding the Logger to the
+        // global handler instead of the regular handler, we can log every
+        // single event that occurs in the system regardless of whether or not
+        // there are specific handlers associated with the objects that are the
+        // target of those events.
+        reactor.getGlobalHandler().add(new Logger());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
new file mode 100644
index 0000000..b04273b
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// TODO: sort out docs!
+// So far the reactive hello-world doesn't look too different from a
+// regular old non-reactive hello-world. The on_reactor_init method can
+// be used roughly as a 'main' method would. A program that only uses
+// that one event, however, isn't going to be very reactive. By using
+// other events, we can write a fully reactive program.
+
+public class GoodbyeWorld extends BaseHandler {
+
+    // As before we handle the reactor init event.
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+    }
+
+    // In addition to an initial event, the reactor also produces an
+    // event when it is about to exit. This may not behave much
+    // differently than just putting the goodbye print statement inside
+    // on_reactor_init, but as we grow our program, this piece of it
+    // will always be what happens last, and will always happen
+    // regardless of what other paths the main logic of our program
+    // might take.
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");;
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new GoodbyeWorld());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
new file mode 100644
index 0000000..745004e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// TODO: sort out docs!
+/*
+ * The proton reactor provides a general purpose event processing
+ * library for writing reactive programs. A reactive program is defined
+ * by a set of event handlers. An event handler is just any class or
+ * object that extends the Handler interface. For convinience, a class
+ * can extend BaseHandler and only handle the events that it cares to
+ * implement methods for.
+ */
+public class HelloWorld extends BaseHandler {
+
+    // The reactor init event is produced by the reactor itself when it
+    // starts.
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        // When you construct a reactor, you can give it a handler that
+        // is used, by default.
+        Reactor reactor = Proton.reactor(new HelloWorld());
+
+        // When you call run, the reactor will process events. The reactor init
+        // event is what kicks off everything else. When the reactor has no
+        // more events to process, it exits.
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
new file mode 100644
index 0000000..b4a8cba
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+/*
+class Logger:
+
+    def on_unhandled(self, name, event):
+        print "LOG:", name, event
+
+class Program:
+
+    def on_reactor_init(self, event):
+        print "Hello, World!"
+
+    def on_reactor_final(self, event):
+        print "Goodbye, World!"
+
+# You can pass multiple handlers to a reactor when you construct it.
+# Each of these handlers will see every event the reactor sees. By
+# combining this with on_unhandled, you can log each event that goes
+# to the reactor.
+r = Reactor(Program(), Logger())
+r.run()
+
+# Note that if you wanted to add the logger later, you could also
+# write the above as below. All arguments to the reactor are just
+# added to the default handler for the reactor.
+
+def logging_enabled():
+    return False
+
+r = Reactor(Program())
+if logging_enabled():
+    r.handler.add(Logger())
+r.run()
+
+ */
+public class ReactorLogger extends BaseHandler {
+
+    public static class Logger extends BaseHandler {
+        @Override
+        public void onUnhandled(Event event) {
+            System.out.println("LOG: " + event);
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event e) {
+        System.out.println("Hello, World!");
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");
+    }
+
+    private static boolean loggingEnabled = false;
+
+    public static void main(String[] args) throws IOException {
+
+        // You can pass multiple handlers to a reactor when you construct it.
+        // Each of these handlers will see every event the reactor sees. By
+        // combining this with on_unhandled, you can log each event that goes
+        // to the reactor.
+        Reactor reactor = Proton.reactor(new ReactorLogger(), new Logger());
+        reactor.run();
+
+        // Note that if you wanted to add the logger later, you could also
+        // write the above as below. All arguments to the reactor are just
+        // added to the default handler for the reactor.
+        reactor = Proton.reactor(new ReactorLogger());
+        if (loggingEnabled)
+            reactor.getHandler().add(new Logger());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
new file mode 100644
index 0000000..47e0cb3
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Task;
+
+public class Scheduling extends BaseHandler {
+
+    private long startTime;
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+        Task task = event.getReactor().schedule(1000, this);
+        task.setAttachment("Yay");
+    }
+
+    @Override
+    public void onTimerTask(Event event) {
+        Task task = event.getTask();
+        System.out.println(task.getAttachment() + " my task is complete!");
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Scheduling());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
new file mode 100644
index 0000000..a3cc200
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Unhandled extends BaseHandler {
+
+    // If an event occurs and its handler doesn't have an on_<event>
+    // method, the reactor will attempt to call the on_unhandled method
+    // if it exists. This can be useful not only for debugging, but for
+    // logging and for delegating/inheritance.
+    @Override
+    public void onUnhandled(Event event) {
+        System.out.println(event);
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Unhandled());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
index 39b04e5..b64225a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
@@ -21,8 +21,7 @@
 package org.apache.qpid.proton;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Footer;
@@ -33,14 +32,16 @@ import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.Codec;
 import org.apache.qpid.proton.codec.Data;
 import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.engine.Engine;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Engine;
+import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.reactor.Reactor;
 
 public final class Proton
 {
@@ -110,4 +111,17 @@ public final class Proton
         return Driver.Factory.create();
     }
 
+    public static Reactor reactor() throws IOException
+    {
+        return Reactor.Factory.create();
+    }
+
+    public static Reactor reactor(Handler... handlers) throws IOException
+    {
+        Reactor reactor = Reactor.Factory.create();
+        for (Handler handler : handlers) {
+            reactor.getHandler().add(handler);
+        }
+        return reactor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
index 94f4d12..ac17c5e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.proton.engine;
 
+import java.util.HashSet;
+import java.util.Iterator;
+
 
 /**
  * BaseHandler
@@ -28,6 +31,7 @@ package org.apache.qpid.proton.engine;
 
 public class BaseHandler implements Handler
 {
+    private HashSet<Handler> children = new HashSet<Handler>();
 
     @Override public void onConnectionInit(Event e) { onUnhandled(e); }
     @Override public void onConnectionLocalOpen(Event e) { onUnhandled(e); }
@@ -62,6 +66,30 @@ public class BaseHandler implements Handler
     @Override public void onTransportTailClosed(Event e) { onUnhandled(e); }
     @Override public void onTransportClosed(Event e) { onUnhandled(e); }
 
+    @Override public void onReactorInit(Event e) { onUnhandled(e); }
+    @Override public void onReactorQuiesced(Event e) { onUnhandled(e); }
+    @Override public void onReactorFinal(Event e) { onUnhandled(e); }
+
+    @Override public void onTimerTask(Event e) { onUnhandled(e); }
+
+    @Override public void onSelectableInit(Event e) { onUnhandled(e); }
+    @Override public void onSelectableUpdated(Event e) { onUnhandled(e); }
+    @Override public void onSelectableReadable(Event e) { onUnhandled(e); }
+    @Override public void onSelectableWritable(Event e) { onUnhandled(e); }
+    @Override public void onSelectableExpired(Event e) { onUnhandled(e); }
+    @Override public void onSelectableError(Event e) { onUnhandled(e); }
+    @Override public void onSelectableFinal(Event e) { onUnhandled(e); }
+
     @Override public void onUnhandled(Event event) {}
 
+    @Override
+    public void add(Handler child) {
+        children.add(child);
+    }
+
+    @Override
+    public Iterator<Handler> children() {
+        return children.iterator();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
index f9e6fe5..142406e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
@@ -41,4 +41,5 @@ public interface Collector
 
     void pop();
 
+    boolean more();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
index 5547a57..5cb57a2 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
@@ -22,8 +22,8 @@ package org.apache.qpid.proton.engine;
 
 import java.util.EnumSet;
 import java.util.Map;
-import org.apache.qpid.proton.amqp.Symbol;
 
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 
 
@@ -35,7 +35,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl;
  * {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)}
  * {@link #getWorkHead()} respectively.
  */
-public interface Connection extends Endpoint
+public interface Connection extends HandlerEndpoint
 {
 
     public static final class Factory

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
index ddb6937..d69b282 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Task;
+
 
 /**
  * Event
@@ -95,6 +99,12 @@ public interface Event
 
     Transport getTransport();
 
+    Reactor getReactor();
+
+    Selectable getSelectable();
+
+    Task getTask();
+
     Event copy();
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
index 5ff77e0..fe72091 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.proton.engine;
 
+import java.util.Iterator;
+
 
 /**
  * Handler
@@ -62,6 +64,22 @@ public interface Handler
     void onTransportTailClosed(Event e);
     void onTransportClosed(Event e);
 
+    void onReactorInit(Event e);
+    void onReactorQuiesced(Event e);
+    void onReactorFinal(Event e);
+
+    void onTimerTask(Event e);
+
+    void onSelectableInit(Event e);
+    void onSelectableUpdated(Event e);
+    void onSelectableReadable(Event e);
+    void onSelectableWritable(Event e);
+    void onSelectableExpired(Event e);
+    void onSelectableError(Event e);
+    void onSelectableFinal(Event e);
+
     void onUnhandled(Event e);
 
+    void add(Handler child);
+    Iterator<Handler> children();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
new file mode 100644
index 0000000..ecadc0a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine;
+
+public interface HandlerEndpoint extends Endpoint {
+
+    void add(Handler handler);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index c965a29..caafc14 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -21,7 +21,7 @@
 package org.apache.qpid.proton.engine;
 
 import java.util.EnumSet;
-import java.util.Iterator;
+
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.amqp.transport.Source;
@@ -37,7 +37,7 @@ import org.apache.qpid.proton.amqp.transport.Target;
  *
  * TODO describe the application's responsibility to honour settlement.
  */
-public interface Link extends Endpoint
+public interface Link extends HandlerEndpoint
 {
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
index f2f048a..eaddac0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
@@ -28,7 +28,7 @@ import java.util.EnumSet;
  *
  * Note that session level flow control is handled internally by Proton.
  */
-public interface Session extends Endpoint
+public interface Session extends HandlerEndpoint
 {
     /**
      * Returns a newly created sender endpoint

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
index e222819..fe09a23 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
@@ -23,9 +23,6 @@ package org.apache.qpid.proton.engine.impl;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Event;
 
-import java.util.LinkedList;
-import java.util.Queue;
-
 
 /**
  * CollectorImpl
@@ -42,11 +39,13 @@ public class CollectorImpl implements Collector
     public CollectorImpl()
     {}
 
+    @Override
     public Event peek()
     {
         return head;
     }
 
+    @Override
     public void pop()
     {
         if (head != null) {
@@ -87,4 +86,9 @@ public class CollectorImpl implements Collector
         return event;
     }
 
+    @Override
+    public boolean more() {
+        return head != null && head.next != null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index 17ffde7..eecc05e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -29,7 +29,7 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.amqp.transport.Open;
 
-public class ConnectionImpl extends EndpointImpl implements ProtonJConnection
+public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection
 {
     public static final int MAX_CHANNELS = 65535;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
index 3317bf9..65a2000 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
@@ -20,13 +20,18 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
+import java.util.Iterator;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Task;
 
 /**
  * EventImpl
@@ -57,16 +62,19 @@ class EventImpl implements Event
         context = null;
     }
 
+    @Override
     public Type getType()
     {
         return type;
     }
 
+    @Override
     public Object getContext()
     {
         return context;
     }
 
+    @Override
     public void dispatch(Handler handler)
     {
         switch (type) {
@@ -157,12 +165,51 @@ class EventImpl implements Event
         case TRANSPORT_CLOSED:
             handler.onTransportClosed(this);
             break;
+        case REACTOR_FINAL:
+            handler.onReactorFinal(this);
+            break;
+        case REACTOR_QUIESCED:
+            handler.onReactorQuiesced(this);
+            break;
+        case REACTOR_INIT:
+            handler.onReactorInit(this);
+            break;
+        case SELECTABLE_ERROR:
+            handler.onSelectableError(this);
+            break;
+        case SELECTABLE_EXPIRED:
+            handler.onSelectableExpired(this);
+            break;
+        case SELECTABLE_FINAL:
+            handler.onSelectableFinal(this);
+            break;
+        case SELECTABLE_INIT:
+            handler.onSelectableInit(this);
+            break;
+        case SELECTABLE_READABLE:
+            handler.onSelectableReadable(this);
+            break;
+        case SELECTABLE_UPDATED:
+            handler.onSelectableWritable(this);
+            break;
+        case SELECTABLE_WRITABLE:
+            handler.onSelectableWritable(this);
+            break;
+        case TIMER_TASK:
+            handler.onTimerTask(this);
+            break;
         default:
             handler.onUnhandled(this);
             break;
         }
+
+        Iterator<Handler> children = handler.children();
+        while(children.hasNext()) {
+            dispatch(children.next());
+        }
     }
 
+    @Override
     public Connection getConnection()
     {
         if (context instanceof Connection) {
@@ -182,6 +229,7 @@ class EventImpl implements Event
         }
     }
 
+    @Override
     public Session getSession()
     {
         if (context instanceof Session) {
@@ -195,6 +243,7 @@ class EventImpl implements Event
         }
     }
 
+    @Override
     public Link getLink()
     {
         if (context instanceof Link) {
@@ -208,6 +257,7 @@ class EventImpl implements Event
         }
     }
 
+    @Override
     public Delivery getDelivery()
     {
         if (context instanceof Delivery) {
@@ -217,6 +267,7 @@ class EventImpl implements Event
         }
     }
 
+    @Override
     public Transport getTransport()
     {
         if (context instanceof Transport) {
@@ -225,6 +276,52 @@ class EventImpl implements Event
             return null;
         }
     }
+
+    @Override
+    public Selectable getSelectable() {
+        if (context instanceof Selectable) {
+            return (Selectable) context;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Reactor getReactor() {
+        if (context instanceof Reactor) {
+            return (Reactor) context;
+        } else if (context instanceof Task) {
+            return ((Task)context).getReactor();
+        } else if (context instanceof Transport) {
+            return ((TransportImpl)context).getReactor();
+        } else if (context instanceof Delivery) {
+            Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport();
+            return ((TransportImpl)transport).getReactor();
+        } else if (context instanceof Link) {
+            Transport transport = ((Link)context).getSession().getConnection().getTransport();
+            return ((TransportImpl)transport).getReactor();
+        } else if (context instanceof Session) {
+            Transport transport = ((Session)context).getConnection().getTransport();
+            return ((TransportImpl)transport).getReactor();
+        } else if (context instanceof Connection) {
+            Transport transport = ((Connection)context).getTransport();
+            return ((TransportImpl)transport).getReactor();
+        } else if (context instanceof Selectable) {
+            return ((Selectable)context).getReactor();
+        }
+        return null;
+    }
+
+    @Override
+    public Task getTask() {
+        if (context instanceof Task) {
+            return (Task) context;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
     public Event copy()
     {
        EventImpl newEvent = new EventImpl();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
new file mode 100644
index 0000000..a108412
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine.impl;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.HandlerEndpoint;
+
+
+public abstract class HandlerEndpointImpl extends EndpointImpl implements HandlerEndpoint {
+
+    private Handler handler = null;
+
+    @Override
+    public void add(Handler handler) {
+        if (this.handler == null) {
+            this.handler = new BaseHandler();
+        }
+        this.handler.add(handler);
+    }
+
+    public Handler getHandler() {
+        return handler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 9966526..af92fb8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -21,15 +21,16 @@
 package org.apache.qpid.proton.engine.impl;
 
 import java.util.EnumSet;
+
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.amqp.transport.Source;
 import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
 
-public abstract class LinkImpl extends EndpointImpl implements Link
+public abstract class LinkImpl extends HandlerEndpointImpl implements Link
 {
 
     private final SessionImpl _session;
@@ -68,16 +69,19 @@ public abstract class LinkImpl extends EndpointImpl implements Link
     }
 
 
+    @Override
     public String getName()
     {
         return _name;
     }
 
+    @Override
     public DeliveryImpl delivery(byte[] tag)
     {
         return delivery(tag, 0, tag.length);
     }
 
+    @Override
     public DeliveryImpl delivery(byte[] tag, int offset, int length)
     {
         if (offset != 0 || length != tag.length)
@@ -146,11 +150,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         }
     }
 
+    @Override
     public DeliveryImpl current()
     {
         return _current;
     }
 
+    @Override
     public boolean advance()
     {
         if(_current != null )
@@ -178,11 +184,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         return _session.getConnectionImpl();
     }
 
+    @Override
     public SessionImpl getSession()
     {
         return _session;
     }
 
+    @Override
     public Source getRemoteSource()
     {
         return _remoteSource;
@@ -193,6 +201,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _remoteSource = source;
     }
 
+    @Override
     public Target getRemoteTarget()
     {
         return _remoteTarget;
@@ -203,28 +212,33 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _remoteTarget = target;
     }
 
+    @Override
     public Source getSource()
     {
         return _source;
     }
 
+    @Override
     public void setSource(Source source)
     {
         // TODO - should be an error if local state is ACTIVE
         _source = source;
     }
 
+    @Override
     public Target getTarget()
     {
         return _target;
     }
 
+    @Override
     public void setTarget(Target target)
     {
         // TODO - should be an error if local state is ACTIVE
         _target = target;
     }
 
+    @Override
     public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
     {
         LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote);
@@ -237,6 +251,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
 
     abstract TransportLink getTransportLink();
 
+    @Override
     public int getCredit()
     {
         return _credit;
@@ -267,6 +282,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _credit--;
     }
 
+    @Override
     public int getQueued()
     {
         return _queued;
@@ -282,6 +298,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _queued--;
     }
 
+    @Override
     public int getUnsettled()
     {
         return _unsettled;
@@ -302,6 +319,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _drain = drain;
     }
 
+    @Override
     public boolean getDrain()
     {
         return _drain;
@@ -354,6 +372,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _remoteReceiverSettleMode = remoteReceiverSettleMode;
     }
 
+    @Override
     public int drained()
     {
         int drained = 0;
@@ -384,11 +403,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         _drained = value;
     }
 
+    @Override
     public int getRemoteCredit()
     {
         return _credit - _queued;
     }
 
+    @Override
     public DeliveryImpl head()
     {
         return _head;
@@ -406,6 +427,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
         getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this);
     }
 
+    @Override
     public void detach()
     {
         _detached = true;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
index 45fcb70..9969b93 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
@@ -20,14 +20,18 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.ProtonJSession;
 import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Event;
 
-public class SessionImpl extends EndpointImpl implements ProtonJSession
+public class SessionImpl extends HandlerEndpointImpl implements ProtonJSession
 {
     private final ConnectionImpl _connection;
 
@@ -52,6 +56,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
         _connection.put(Event.Type.SESSION_INIT, this);
     }
 
+    @Override
     public SenderImpl sender(String name)
     {
         SenderImpl sender = _senders.get(name);
@@ -74,6 +79,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
         return sender;
     }
 
+    @Override
     public ReceiverImpl receiver(String name)
     {
         ReceiverImpl receiver = _receivers.get(name);
@@ -96,6 +102,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
         return receiver;
     }
 
+    @Override
     public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
     {
         LinkNode.Query<SessionImpl> query = new EndpointImplQuery<SessionImpl>(local, remote);
@@ -111,6 +118,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
         return _connection;
     }
 
+    @Override
     public ConnectionImpl getConnection()
     {
         return getConnectionImpl();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index a5c8ba9..f4813cd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -57,6 +57,8 @@ import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.TransportResultFactory;
 import org.apache.qpid.proton.engine.impl.ssl.SslImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
 
 public class TransportImpl extends EndpointImpl
     implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>,
@@ -129,6 +131,9 @@ public class TransportImpl extends EndpointImpl
     private long _lastBytesOutput = 0;
     private long _remoteIdleDeadline = 0;
 
+    private Selectable _selectable;
+    private Reactor _reactor;
+
     /**
      * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
      * Client code outside this module should use a {@link EngineFactory} instead
@@ -1438,14 +1443,17 @@ public class TransportImpl extends EndpointImpl
         }
     }
 
+    @Override
     public void setIdleTimeout(int timeout) {
         _localIdleTimeout = timeout;
     }
 
+    @Override
     public int getIdleTimeout() {
         return _localIdleTimeout;
     }
 
+    @Override
     public int getRemoteIdleTimeout() {
         return _remoteIdleTimeout;
     }
@@ -1525,6 +1533,7 @@ public class TransportImpl extends EndpointImpl
         _outputProcessor.close_head();
     }
 
+    @Override
     public boolean isClosed() {
         int p = pending();
         int c = capacity();
@@ -1588,4 +1597,20 @@ public class TransportImpl extends EndpointImpl
 
     @Override
     void localClose() {}
+
+    public void setSelectable(Selectable selectable) {
+        _selectable = selectable;
+    }
+
+    public Selectable getSelectable() {
+        return _selectable;
+    }
+
+    public void setReactor(Reactor reactor) {
+        _reactor = reactor;
+    }
+
+    public Reactor getReactor() {
+        return _reactor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
new file mode 100644
index 0000000..02c5de2
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.impl.ReactorImpl;
+
+
+
+public interface Reactor {
+
+    public static final class Factory
+    {
+        public static Reactor create() throws IOException {
+            return new ReactorImpl();
+        }
+    }
+
+    public long mark();
+    public long now();
+    public void attach(Object attachment);
+    public Object attachment();
+    public long getTimeout();
+
+    public void setTimeout(long timeout);
+
+    public Handler getGlobalHandler();
+
+    public void setGlobalHandler(Handler handler);
+
+    public Handler getHandler();
+
+    public void setHandler(Handler handler);
+
+/* TODO
+ * pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
+166   assert(reactor);
+167   return reactor->io;
+168 }
+169
+
+ */
+
+    public Set<Selectable> children();
+
+    public Collector collector();
+
+
+    public Selectable selectable();
+
+
+
+    public void update(Selectable selectable);
+
+
+    void yield() ;
+
+    public boolean quiesced();
+
+    public boolean process();
+
+    public void wakeup() throws IOException;
+
+    public void start() ;
+
+    public void stop() ;
+
+    public void run();
+
+    // pn_reactor_schedule from reactor.c
+    public Task schedule(int delay, Handler handler);
+    // TODO: acceptor
+    // TODO: connection
+    // TODO: acceptorClose
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
new file mode 100644
index 0000000..c2b560f
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Transport;
+
+public interface Selectable {
+
+    public interface Callback {
+        void run(Selectable selectable);
+    }
+
+    public boolean isReading();
+
+    boolean isWriting();
+
+    long getDeadline() ;
+
+    void setReading(boolean reading) ;
+
+    void setWriting(boolean writing);
+
+    void setDeadline(long deadline) ;
+
+    public void onReadable(Callback runnable) ;
+
+    public void onWritable(Callback runnable);
+
+    public void onExpired(Callback runnable);
+
+    public void onError(Callback runnable);
+
+    public void onRelease(Callback runnable);
+
+    public void onFinalize(Callback runnable);
+
+    void readable() ;
+
+    void writeable() ;
+
+    void expired() ;
+
+    void error();
+
+    void release() ;
+
+    void _finalize() ;
+
+    // These are equivalent to the C code's set/get file descritor functions.
+    void setChannel(SelectableChannel channel) ;
+
+    public SelectableChannel getChannel() ;
+
+    void setAttachment(Object attachment) ;
+
+    Object getAttachment() ;
+
+    boolean isRegistered() ;
+
+    void setRegistered(boolean registered) ;
+
+    void setCollector(final Collector collector) ;
+
+    public Reactor getReactor() ;
+    public void terminate() ;
+
+    public enum RecordKeyType {
+        PNI_TERMINATED
+    }
+
+    public enum RecordValueType {
+        PN_VOID
+    }
+
+    public boolean hasRecord(RecordKeyType type);
+
+    public void setRecord(RecordKeyType key, RecordValueType value) ;
+    public boolean isTerminal();
+
+
+    public Transport getTransport() ;
+
+    public void setTransport(Transport transport) ;
+
+    public void setReactor(Reactor reactor) ;
+
+    public void add(Handler handler);
+
+    public Handler getHandler() ;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
new file mode 100644
index 0000000..12188e2
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface Selector {
+
+    public void add(Selectable selectable) throws IOException ;
+
+    public void update(Selectable selectable);
+
+    public void remove(Selectable selectable) ;
+
+    public void select(long timeout) throws IOException ;
+
+    public Iterator<Selectable> readable() ;
+
+    public Iterator<Selectable> writeable() ;
+
+    public Iterator<Selectable> expired() ;
+    public Iterator<Selectable> error() ;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
new file mode 100644
index 0000000..88031c5
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import org.apache.qpid.proton.engine.Handler;
+
+public interface Task {
+
+    public long deadline();
+    public void setReactor(Reactor reactor) ;
+    public Reactor getReactor();
+    public void setHandler(Handler handler) ;
+    public Handler getHandler() ;
+    public Object getAttachment() ;
+    public void setAttachment(Object attachment) ;
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org