You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:03 UTC
[02/38] qpid-proton git commit: PROTON-881: Initial commit of
proton-j reactor implementation
PROTON-881: Initial commit of proton-j reactor implementation
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e0187017
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e0187017
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e0187017
Branch: refs/heads/master
Commit: e0187017456a4df58df2f1d04b1941d99eacbe10
Parents: cb4f9b9
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri Apr 17 14:33:11 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:22:27 2015 +0100
----------------------------------------------------------------------
examples/java/reactor/.gitignore | 1 +
.../proton/example/reactor/CountRandomly.java | 104 +++++
.../qpid/proton/example/reactor/Counter.java | 81 ++++
.../qpid/proton/example/reactor/Delegates.java | 68 +++
.../example/reactor/EchoInputStreamWrapper.java | 76 ++++
.../proton/example/reactor/GlobalLogger.java | 114 +++++
.../proton/example/reactor/GoodbyeWorld.java | 62 +++
.../qpid/proton/example/reactor/HelloWorld.java | 60 +++
.../proton/example/reactor/ReactorLogger.java | 103 +++++
.../qpid/proton/example/reactor/Scheduling.java | 60 +++
.../qpid/proton/example/reactor/Unhandled.java | 46 ++
.../java/org/apache/qpid/proton/Proton.java | 20 +-
.../apache/qpid/proton/engine/BaseHandler.java | 28 ++
.../apache/qpid/proton/engine/Collector.java | 1 +
.../apache/qpid/proton/engine/Connection.java | 4 +-
.../org/apache/qpid/proton/engine/Event.java | 10 +
.../org/apache/qpid/proton/engine/Handler.java | 18 +
.../qpid/proton/engine/HandlerEndpoint.java | 28 ++
.../org/apache/qpid/proton/engine/Link.java | 4 +-
.../org/apache/qpid/proton/engine/Session.java | 2 +-
.../qpid/proton/engine/impl/CollectorImpl.java | 10 +-
.../qpid/proton/engine/impl/ConnectionImpl.java | 2 +-
.../qpid/proton/engine/impl/EventImpl.java | 103 ++++-
.../proton/engine/impl/HandlerEndpointImpl.java | 44 ++
.../qpid/proton/engine/impl/LinkImpl.java | 28 +-
.../qpid/proton/engine/impl/SessionImpl.java | 14 +-
.../qpid/proton/engine/impl/TransportImpl.java | 25 ++
.../org/apache/qpid/proton/reactor/Reactor.java | 99 +++++
.../apache/qpid/proton/reactor/Selectable.java | 113 +++++
.../apache/qpid/proton/reactor/Selector.java | 43 ++
.../org/apache/qpid/proton/reactor/Task.java | 35 ++
.../qpid/proton/reactor/impl/IOHandler.java | 333 ++++++++++++++
.../qpid/proton/reactor/impl/ReactorImpl.java | 445 +++++++++++++++++++
.../proton/reactor/impl/SelectableImpl.java | 272 ++++++++++++
.../qpid/proton/reactor/impl/SelectorImpl.java | 137 ++++++
.../qpid/proton/reactor/impl/TaskImpl.java | 83 ++++
.../apache/qpid/proton/reactor/impl/Timer.java | 70 +++
37 files changed, 2725 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/.gitignore
----------------------------------------------------------------------
diff --git a/examples/java/reactor/.gitignore b/examples/java/reactor/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/examples/java/reactor/.gitignore
@@ -0,0 +1 @@
+/bin
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
new file mode 100644
index 0000000..0dcdf4a
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Let's try to modify our counter example. In addition to counting to
+// 10 in quarter second intervals, let's also print out a random number
+// every half second. This is not a super easy thing to express in a
+// purely sequential program, but not so difficult using events.
+public class CountRandomly extends BaseHandler {
+
+ private long startTime;
+ private CounterHandler counter;
+
+ class CounterHandler extends BaseHandler {
+ private final int limit;
+ private int count;
+
+ CounterHandler(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public void onTimerTask(Event event) {
+ count += 1;
+ System.out.println(count);
+
+ if (!done()) {
+ event.getReactor().schedule(250, this);
+ }
+ }
+
+ private boolean done() {
+ return count >= limit;
+ }
+ }
+
+ @Override
+ public void onReactorInit(Event event) {
+ startTime = System.currentTimeMillis();
+ System.out.println("Hello, World!");
+
+ // Save the counter instance in an attribute so we can refer to
+ // it later.
+ counter = new CounterHandler(10);
+ event.getReactor().schedule(250, counter);
+
+ // Now schedule another event with a different handler. Note
+ // that the timer tasks go to separate handlers, and they don't
+ // interfere with each other.
+ event.getReactor().schedule(500, this);
+ }
+
+ @Override
+ public void onTimerTask(Event event) {
+ // keep on shouting until we are done counting
+ System.out.println("Yay, " + Math.round(Math.abs((Math.random() * 110) - 10)));
+ if (!counter.done()) {
+ event.getReactor().schedule(500, this);
+ }
+ }
+
+ @Override
+ public void onReactorFinal(Event event) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+ }
+
+ public static void main(String[] args) throws IOException {
+ // In HelloWorld.java we said the reactor exits when there are no more
+ // events to process. While this is true, it's not actually complete.
+ // The reactor exits when there are no more events to process and no
+ // possibility of future events arising. For that reason the reactor
+ // will keep running until there are no more scheduled events and then
+ // exit.
+ Reactor reactor = Proton.reactor(new CountRandomly());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
new file mode 100644
index 0000000..a34038e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Counter extends BaseHandler {
+
+ private long startTime;
+
+ class CounterHandler extends BaseHandler {
+ private final int limit;
+ private int count;
+
+ CounterHandler(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public void onTimerTask(Event event) {
+ count += 1;
+ System.out.println(count);
+ if (count < limit) {
+ event.getReactor().schedule(250, this);
+ }
+ }
+ }
+
+ @Override
+ public void onReactorInit(Event event) {
+ startTime = System.currentTimeMillis();
+ System.out.println("Hello, World!");
+
+ // Note that unlike the previous scheduling example, we pass in
+ // a separate object for the handler. This means that the timer
+ // event we just scheduled will not be seen by Program as it is
+ // being handled by the Counter instance we create.
+ event.getReactor().schedule(250, new CounterHandler(10));
+ }
+
+ @Override
+ public void onReactorFinal(Event event) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+ }
+
+ public static void main(String[] args) throws IOException {
+ // In HelloWorld.java we said the reactor exits when there are no more
+ // events to process. While this is true, it's not actually complete.
+ // The reactor exits when there are no more events to process and no
+ // possibility of future events arising. For that reason the reactor
+ // will keep running until there are no more scheduled events and then
+ // exit.
+ Reactor reactor = Proton.reactor(new Counter());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
new file mode 100644
index 0000000..7b4e36f
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Events know how to dispatch themselves to handlers. By combining
+// this with on_unhandled, you can provide a kind of inheritance
+/// between handlers using delegation.
+public class Delegates extends BaseHandler {
+
+ private final Handler[] handlers;
+
+ static class Hello extends BaseHandler {
+ @Override
+ public void onReactorInit(Event e) {
+ System.out.println("Hello, World!");
+ }
+ }
+
+ static class Goodbye extends BaseHandler {
+ @Override
+ public void onReactorFinal(Event e) {
+ System.out.println("Goodbye, World!");
+ }
+ }
+
+ public Delegates(Handler... handlers) {
+ this.handlers = handlers;
+ }
+
+ @Override
+ public void onUnhandled(Event event) {
+ for (Handler handler : handlers) {
+ event.dispatch(handler);
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ Reactor reactor = Proton.reactor(new Delegates(new Hello(), new Goodbye()));
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
new file mode 100644
index 0000000..2e53d09
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
+import java.nio.channels.Pipe.SourceChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EchoInputStreamWrapper extends Thread {
+
+ private final InputStream in;
+ private final SinkChannel out;
+ private final byte[] bufferBytes = new byte[1024];
+ private final ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+ private final AtomicInteger idCounter = new AtomicInteger();
+
+ private EchoInputStreamWrapper(InputStream in, SinkChannel out) {
+ this.in = in;
+ this.out = out;
+ setName(getClass().getName() + "-" + idCounter.incrementAndGet());
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while(true) {
+ int amount = in.read(bufferBytes);
+ if (amount < 0) break;
+ buffer.position(0);
+ buffer.limit(amount);
+ out.write(buffer);
+ }
+ } catch(IOException ioException) {
+ ioException.printStackTrace();
+ } finally {
+ try {
+ out.close();
+ } catch(IOException ioException) {
+ ioException.printStackTrace();
+ }
+ }
+ }
+
+ public static SourceChannel wrap(InputStream in) throws IOException {
+ Pipe pipe = Pipe.open();
+ new EchoInputStreamWrapper(in, pipe.sink()).start();
+ SourceChannel result = pipe.source();
+ result.configureBlocking(false);
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
new file mode 100644
index 0000000..1bb3d3e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+/*
+# Not every event goes to the reactor's event handler. If we have a
+# separate handler for something like a scheduled task, then those
+# events aren't logged by the logger associated with the reactor's
+# handler. Sometimes this is useful if you don't want to see them, but
+# sometimes you want the global picture.
+
+class Logger:
+
+ def on_unhandled(self, name, event):
+ print "LOG:", name, event
+
+class Task:
+
+ def on_timer_task(self, event):
+ print "Mission accomplished!"
+
+class Program:
+
+ def on_reactor_init(self, event):
+ print "Hello, World!"
+ event.reactor.schedule(0, Task())
+
+ def on_reactor_final(self, event):
+ print "Goodbye, World!"
+
+r = Reactor(Program())
+
+# In addition to having a regular handler, the reactor also has a
+# global handler that sees every event. By adding the Logger to the
+# global handler instead of the regular handler, we can log every
+# single event that occurs in the system regardless of whether or not
+# there are specific handlers associated with the objects that are the
+# target of those events.
+r.global_handler.add(Logger())
+r.run()
+
+ */
+
+// Not every event goes to the reactor's event handler. If we have a
+// separate handler for something like a scheduled task, then those
+// events aren't logged by the logger associated with the reactor's
+// handler. Sometimes this is useful if you don't want to see them, but
+// sometimes you want the global picture.
+public class GlobalLogger extends BaseHandler {
+
+ static class Logger extends BaseHandler {
+ @Override
+ public void onUnhandled(Event event) {
+ System.out.println("LOG: " + event);
+ }
+ }
+
+ static class Task extends BaseHandler {
+ @Override
+ public void onTimerTask(Event e) {
+ System.out.println("Mission accomplished!");
+ }
+ }
+
+ @Override
+ public void onReactorInit(Event event) {
+ System.out.println("Hello, World!");
+ event.getReactor().schedule(0, new Task());
+ }
+
+ @Override
+ public void onReactorFinal(Event e) {
+ System.out.println("Goodbye, World!");
+ }
+
+ public static void main(String[] args) throws IOException {
+ Reactor reactor = Proton.reactor(new GlobalLogger());
+
+ // In addition to having a regular handler, the reactor also has a
+ // global handler that sees every event. By adding the Logger to the
+ // global handler instead of the regular handler, we can log every
+ // single event that occurs in the system regardless of whether or not
+ // there are specific handlers associated with the objects that are the
+ // target of those events.
+ reactor.getGlobalHandler().add(new Logger());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
new file mode 100644
index 0000000..b04273b
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// TODO: sort out docs!
+// So far the reactive hello-world doesn't look too different from a
+// regular old non-reactive hello-world. The on_reactor_init method can
+// be used roughly as a 'main' method would. A program that only uses
+// that one event, however, isn't going to be very reactive. By using
+// other events, we can write a fully reactive program.
+
+public class GoodbyeWorld extends BaseHandler {
+
+ // As before we handle the reactor init event.
+ @Override
+ public void onReactorInit(Event event) {
+ System.out.println("Hello, World!");
+ }
+
+ // In addition to an initial event, the reactor also produces an
+ // event when it is about to exit. This may not behave much
+ // differently than just putting the goodbye print statement inside
+ // on_reactor_init, but as we grow our program, this piece of it
+ // will always be what happens last, and will always happen
+ // regardless of what other paths the main logic of our program
+ // might take.
+ @Override
+ public void onReactorFinal(Event e) {
+ System.out.println("Goodbye, World!");;
+ }
+
+ public static void main(String[] args) throws IOException {
+ Reactor reactor = Proton.reactor(new GoodbyeWorld());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
new file mode 100644
index 0000000..745004e
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// TODO: sort out docs!
+/*
+ * The proton reactor provides a general purpose event processing
+ * library for writing reactive programs. A reactive program is defined
+ * by a set of event handlers. An event handler is just any class or
+ * object that extends the Handler interface. For convinience, a class
+ * can extend BaseHandler and only handle the events that it cares to
+ * implement methods for.
+ */
+public class HelloWorld extends BaseHandler {
+
+ // The reactor init event is produced by the reactor itself when it
+ // starts.
+ @Override
+ public void onReactorInit(Event event) {
+ System.out.println("Hello, World!");
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ // When you construct a reactor, you can give it a handler that
+ // is used, by default.
+ Reactor reactor = Proton.reactor(new HelloWorld());
+
+ // When you call run, the reactor will process events. The reactor init
+ // event is what kicks off everything else. When the reactor has no
+ // more events to process, it exits.
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
new file mode 100644
index 0000000..b4a8cba
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+/*
+class Logger:
+
+ def on_unhandled(self, name, event):
+ print "LOG:", name, event
+
+class Program:
+
+ def on_reactor_init(self, event):
+ print "Hello, World!"
+
+ def on_reactor_final(self, event):
+ print "Goodbye, World!"
+
+# You can pass multiple handlers to a reactor when you construct it.
+# Each of these handlers will see every event the reactor sees. By
+# combining this with on_unhandled, you can log each event that goes
+# to the reactor.
+r = Reactor(Program(), Logger())
+r.run()
+
+# Note that if you wanted to add the logger later, you could also
+# write the above as below. All arguments to the reactor are just
+# added to the default handler for the reactor.
+
+def logging_enabled():
+ return False
+
+r = Reactor(Program())
+if logging_enabled():
+ r.handler.add(Logger())
+r.run()
+
+ */
+public class ReactorLogger extends BaseHandler {
+
+ public static class Logger extends BaseHandler {
+ @Override
+ public void onUnhandled(Event event) {
+ System.out.println("LOG: " + event);
+ }
+ }
+
+ @Override
+ public void onReactorInit(Event e) {
+ System.out.println("Hello, World!");
+ }
+
+ @Override
+ public void onReactorFinal(Event e) {
+ System.out.println("Goodbye, World!");
+ }
+
+ private static boolean loggingEnabled = false;
+
+ public static void main(String[] args) throws IOException {
+
+ // You can pass multiple handlers to a reactor when you construct it.
+ // Each of these handlers will see every event the reactor sees. By
+ // combining this with on_unhandled, you can log each event that goes
+ // to the reactor.
+ Reactor reactor = Proton.reactor(new ReactorLogger(), new Logger());
+ reactor.run();
+
+ // Note that if you wanted to add the logger later, you could also
+ // write the above as below. All arguments to the reactor are just
+ // added to the default handler for the reactor.
+ reactor = Proton.reactor(new ReactorLogger());
+ if (loggingEnabled)
+ reactor.getHandler().add(new Logger());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
new file mode 100644
index 0000000..47e0cb3
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Task;
+
+public class Scheduling extends BaseHandler {
+
+ private long startTime;
+
+ @Override
+ public void onReactorInit(Event event) {
+ startTime = System.currentTimeMillis();
+ System.out.println("Hello, World!");
+ Task task = event.getReactor().schedule(1000, this);
+ task.setAttachment("Yay");
+ }
+
+ @Override
+ public void onTimerTask(Event event) {
+ Task task = event.getTask();
+ System.out.println(task.getAttachment() + " my task is complete!");
+ }
+
+ @Override
+ public void onReactorFinal(Event e) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+ }
+
+ public static void main(String[] args) throws IOException {
+ Reactor reactor = Proton.reactor(new Scheduling());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
new file mode 100644
index 0000000..a3cc200
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Unhandled extends BaseHandler {
+
+ // If an event occurs and its handler doesn't have an on_<event>
+ // method, the reactor will attempt to call the on_unhandled method
+ // if it exists. This can be useful not only for debugging, but for
+ // logging and for delegating/inheritance.
+ @Override
+ public void onUnhandled(Event event) {
+ System.out.println(event);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Reactor reactor = Proton.reactor(new Unhandled());
+ reactor.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
index 39b04e5..b64225a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
@@ -21,8 +21,7 @@
package org.apache.qpid.proton;
import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
@@ -33,14 +32,16 @@ import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.Codec;
import org.apache.qpid.proton.codec.Data;
import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.engine.Engine;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Engine;
+import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.reactor.Reactor;
public final class Proton
{
@@ -110,4 +111,17 @@ public final class Proton
return Driver.Factory.create();
}
+ public static Reactor reactor() throws IOException
+ {
+ return Reactor.Factory.create();
+ }
+
+ public static Reactor reactor(Handler... handlers) throws IOException
+ {
+ Reactor reactor = Reactor.Factory.create();
+ for (Handler handler : handlers) {
+ reactor.getHandler().add(handler);
+ }
+ return reactor;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
index 94f4d12..ac17c5e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.proton.engine;
+import java.util.HashSet;
+import java.util.Iterator;
+
/**
* BaseHandler
@@ -28,6 +31,7 @@ package org.apache.qpid.proton.engine;
public class BaseHandler implements Handler
{
+ private HashSet<Handler> children = new HashSet<Handler>();
@Override public void onConnectionInit(Event e) { onUnhandled(e); }
@Override public void onConnectionLocalOpen(Event e) { onUnhandled(e); }
@@ -62,6 +66,30 @@ public class BaseHandler implements Handler
@Override public void onTransportTailClosed(Event e) { onUnhandled(e); }
@Override public void onTransportClosed(Event e) { onUnhandled(e); }
+ @Override public void onReactorInit(Event e) { onUnhandled(e); }
+ @Override public void onReactorQuiesced(Event e) { onUnhandled(e); }
+ @Override public void onReactorFinal(Event e) { onUnhandled(e); }
+
+ @Override public void onTimerTask(Event e) { onUnhandled(e); }
+
+ @Override public void onSelectableInit(Event e) { onUnhandled(e); }
+ @Override public void onSelectableUpdated(Event e) { onUnhandled(e); }
+ @Override public void onSelectableReadable(Event e) { onUnhandled(e); }
+ @Override public void onSelectableWritable(Event e) { onUnhandled(e); }
+ @Override public void onSelectableExpired(Event e) { onUnhandled(e); }
+ @Override public void onSelectableError(Event e) { onUnhandled(e); }
+ @Override public void onSelectableFinal(Event e) { onUnhandled(e); }
+
@Override public void onUnhandled(Event event) {}
+ @Override
+ public void add(Handler child) {
+ children.add(child);
+ }
+
+ @Override
+ public Iterator<Handler> children() {
+ return children.iterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
index f9e6fe5..142406e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
@@ -41,4 +41,5 @@ public interface Collector
void pop();
+ boolean more();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
index 5547a57..5cb57a2 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
@@ -22,8 +22,8 @@ package org.apache.qpid.proton.engine;
import java.util.EnumSet;
import java.util.Map;
-import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
@@ -35,7 +35,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl;
* {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)}
* {@link #getWorkHead()} respectively.
*/
-public interface Connection extends Endpoint
+public interface Connection extends HandlerEndpoint
{
public static final class Factory
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
index ddb6937..d69b282 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Task;
+
/**
* Event
@@ -95,6 +99,12 @@ public interface Event
Transport getTransport();
+ Reactor getReactor();
+
+ Selectable getSelectable();
+
+ Task getTask();
+
Event copy();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
index 5ff77e0..fe72091 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.proton.engine;
+import java.util.Iterator;
+
/**
* Handler
@@ -62,6 +64,22 @@ public interface Handler
void onTransportTailClosed(Event e);
void onTransportClosed(Event e);
+ void onReactorInit(Event e);
+ void onReactorQuiesced(Event e);
+ void onReactorFinal(Event e);
+
+ void onTimerTask(Event e);
+
+ void onSelectableInit(Event e);
+ void onSelectableUpdated(Event e);
+ void onSelectableReadable(Event e);
+ void onSelectableWritable(Event e);
+ void onSelectableExpired(Event e);
+ void onSelectableError(Event e);
+ void onSelectableFinal(Event e);
+
void onUnhandled(Event e);
+ void add(Handler child);
+ Iterator<Handler> children();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
new file mode 100644
index 0000000..ecadc0a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine;
+
+public interface HandlerEndpoint extends Endpoint {
+
+ void add(Handler handler);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index c965a29..caafc14 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -21,7 +21,7 @@
package org.apache.qpid.proton.engine;
import java.util.EnumSet;
-import java.util.Iterator;
+
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
@@ -37,7 +37,7 @@ import org.apache.qpid.proton.amqp.transport.Target;
*
* TODO describe the application's responsibility to honour settlement.
*/
-public interface Link extends Endpoint
+public interface Link extends HandlerEndpoint
{
/**
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
index f2f048a..eaddac0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
@@ -28,7 +28,7 @@ import java.util.EnumSet;
*
* Note that session level flow control is handled internally by Proton.
*/
-public interface Session extends Endpoint
+public interface Session extends HandlerEndpoint
{
/**
* Returns a newly created sender endpoint
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
index e222819..fe09a23 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
@@ -23,9 +23,6 @@ package org.apache.qpid.proton.engine.impl;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Event;
-import java.util.LinkedList;
-import java.util.Queue;
-
/**
* CollectorImpl
@@ -42,11 +39,13 @@ public class CollectorImpl implements Collector
public CollectorImpl()
{}
+ @Override
public Event peek()
{
return head;
}
+ @Override
public void pop()
{
if (head != null) {
@@ -87,4 +86,9 @@ public class CollectorImpl implements Collector
return event;
}
+ @Override
+ public boolean more() {
+ return head != null && head.next != null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index 17ffde7..eecc05e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -29,7 +29,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.amqp.transport.Open;
-public class ConnectionImpl extends EndpointImpl implements ProtonJConnection
+public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection
{
public static final int MAX_CHANNELS = 65535;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
index 3317bf9..65a2000 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.proton.engine.impl;
+import java.util.Iterator;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Task;
/**
* EventImpl
@@ -57,16 +62,19 @@ class EventImpl implements Event
context = null;
}
+ @Override
public Type getType()
{
return type;
}
+ @Override
public Object getContext()
{
return context;
}
+ @Override
public void dispatch(Handler handler)
{
switch (type) {
@@ -157,12 +165,51 @@ class EventImpl implements Event
case TRANSPORT_CLOSED:
handler.onTransportClosed(this);
break;
+ case REACTOR_FINAL:
+ handler.onReactorFinal(this);
+ break;
+ case REACTOR_QUIESCED:
+ handler.onReactorQuiesced(this);
+ break;
+ case REACTOR_INIT:
+ handler.onReactorInit(this);
+ break;
+ case SELECTABLE_ERROR:
+ handler.onSelectableError(this);
+ break;
+ case SELECTABLE_EXPIRED:
+ handler.onSelectableExpired(this);
+ break;
+ case SELECTABLE_FINAL:
+ handler.onSelectableFinal(this);
+ break;
+ case SELECTABLE_INIT:
+ handler.onSelectableInit(this);
+ break;
+ case SELECTABLE_READABLE:
+ handler.onSelectableReadable(this);
+ break;
+ case SELECTABLE_UPDATED:
+ handler.onSelectableWritable(this);
+ break;
+ case SELECTABLE_WRITABLE:
+ handler.onSelectableWritable(this);
+ break;
+ case TIMER_TASK:
+ handler.onTimerTask(this);
+ break;
default:
handler.onUnhandled(this);
break;
}
+
+ Iterator<Handler> children = handler.children();
+ while(children.hasNext()) {
+ dispatch(children.next());
+ }
}
+ @Override
public Connection getConnection()
{
if (context instanceof Connection) {
@@ -182,6 +229,7 @@ class EventImpl implements Event
}
}
+ @Override
public Session getSession()
{
if (context instanceof Session) {
@@ -195,6 +243,7 @@ class EventImpl implements Event
}
}
+ @Override
public Link getLink()
{
if (context instanceof Link) {
@@ -208,6 +257,7 @@ class EventImpl implements Event
}
}
+ @Override
public Delivery getDelivery()
{
if (context instanceof Delivery) {
@@ -217,6 +267,7 @@ class EventImpl implements Event
}
}
+ @Override
public Transport getTransport()
{
if (context instanceof Transport) {
@@ -225,6 +276,52 @@ class EventImpl implements Event
return null;
}
}
+
+ @Override
+ public Selectable getSelectable() {
+ if (context instanceof Selectable) {
+ return (Selectable) context;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Reactor getReactor() {
+ if (context instanceof Reactor) {
+ return (Reactor) context;
+ } else if (context instanceof Task) {
+ return ((Task)context).getReactor();
+ } else if (context instanceof Transport) {
+ return ((TransportImpl)context).getReactor();
+ } else if (context instanceof Delivery) {
+ Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport();
+ return ((TransportImpl)transport).getReactor();
+ } else if (context instanceof Link) {
+ Transport transport = ((Link)context).getSession().getConnection().getTransport();
+ return ((TransportImpl)transport).getReactor();
+ } else if (context instanceof Session) {
+ Transport transport = ((Session)context).getConnection().getTransport();
+ return ((TransportImpl)transport).getReactor();
+ } else if (context instanceof Connection) {
+ Transport transport = ((Connection)context).getTransport();
+ return ((TransportImpl)transport).getReactor();
+ } else if (context instanceof Selectable) {
+ return ((Selectable)context).getReactor();
+ }
+ return null;
+ }
+
+ @Override
+ public Task getTask() {
+ if (context instanceof Task) {
+ return (Task) context;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
public Event copy()
{
EventImpl newEvent = new EventImpl();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
new file mode 100644
index 0000000..a108412
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine.impl;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.HandlerEndpoint;
+
+
+public abstract class HandlerEndpointImpl extends EndpointImpl implements HandlerEndpoint {
+
+ private Handler handler = null;
+
+ @Override
+ public void add(Handler handler) {
+ if (this.handler == null) {
+ this.handler = new BaseHandler();
+ }
+ this.handler.add(handler);
+ }
+
+ public Handler getHandler() {
+ return handler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 9966526..af92fb8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -21,15 +21,16 @@
package org.apache.qpid.proton.engine.impl;
import java.util.EnumSet;
+
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
-public abstract class LinkImpl extends EndpointImpl implements Link
+public abstract class LinkImpl extends HandlerEndpointImpl implements Link
{
private final SessionImpl _session;
@@ -68,16 +69,19 @@ public abstract class LinkImpl extends EndpointImpl implements Link
}
+ @Override
public String getName()
{
return _name;
}
+ @Override
public DeliveryImpl delivery(byte[] tag)
{
return delivery(tag, 0, tag.length);
}
+ @Override
public DeliveryImpl delivery(byte[] tag, int offset, int length)
{
if (offset != 0 || length != tag.length)
@@ -146,11 +150,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
}
}
+ @Override
public DeliveryImpl current()
{
return _current;
}
+ @Override
public boolean advance()
{
if(_current != null )
@@ -178,11 +184,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
return _session.getConnectionImpl();
}
+ @Override
public SessionImpl getSession()
{
return _session;
}
+ @Override
public Source getRemoteSource()
{
return _remoteSource;
@@ -193,6 +201,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_remoteSource = source;
}
+ @Override
public Target getRemoteTarget()
{
return _remoteTarget;
@@ -203,28 +212,33 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_remoteTarget = target;
}
+ @Override
public Source getSource()
{
return _source;
}
+ @Override
public void setSource(Source source)
{
// TODO - should be an error if local state is ACTIVE
_source = source;
}
+ @Override
public Target getTarget()
{
return _target;
}
+ @Override
public void setTarget(Target target)
{
// TODO - should be an error if local state is ACTIVE
_target = target;
}
+ @Override
public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote);
@@ -237,6 +251,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
abstract TransportLink getTransportLink();
+ @Override
public int getCredit()
{
return _credit;
@@ -267,6 +282,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_credit--;
}
+ @Override
public int getQueued()
{
return _queued;
@@ -282,6 +298,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_queued--;
}
+ @Override
public int getUnsettled()
{
return _unsettled;
@@ -302,6 +319,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_drain = drain;
}
+ @Override
public boolean getDrain()
{
return _drain;
@@ -354,6 +372,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_remoteReceiverSettleMode = remoteReceiverSettleMode;
}
+ @Override
public int drained()
{
int drained = 0;
@@ -384,11 +403,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link
_drained = value;
}
+ @Override
public int getRemoteCredit()
{
return _credit - _queued;
}
+ @Override
public DeliveryImpl head()
{
return _head;
@@ -406,6 +427,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link
getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this);
}
+ @Override
public void detach()
{
_detached = true;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
index 45fcb70..9969b93 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
@@ -20,14 +20,18 @@
*/
package org.apache.qpid.proton.engine.impl;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.ProtonJSession;
import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Event;
-public class SessionImpl extends EndpointImpl implements ProtonJSession
+public class SessionImpl extends HandlerEndpointImpl implements ProtonJSession
{
private final ConnectionImpl _connection;
@@ -52,6 +56,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
_connection.put(Event.Type.SESSION_INIT, this);
}
+ @Override
public SenderImpl sender(String name)
{
SenderImpl sender = _senders.get(name);
@@ -74,6 +79,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
return sender;
}
+ @Override
public ReceiverImpl receiver(String name)
{
ReceiverImpl receiver = _receivers.get(name);
@@ -96,6 +102,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
return receiver;
}
+ @Override
public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
LinkNode.Query<SessionImpl> query = new EndpointImplQuery<SessionImpl>(local, remote);
@@ -111,6 +118,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
return _connection;
}
+ @Override
public ConnectionImpl getConnection()
{
return getConnectionImpl();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index a5c8ba9..f4813cd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -57,6 +57,8 @@ import org.apache.qpid.proton.engine.TransportResult;
import org.apache.qpid.proton.engine.TransportResultFactory;
import org.apache.qpid.proton.engine.impl.ssl.SslImpl;
import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
public class TransportImpl extends EndpointImpl
implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>,
@@ -129,6 +131,9 @@ public class TransportImpl extends EndpointImpl
private long _lastBytesOutput = 0;
private long _remoteIdleDeadline = 0;
+ private Selectable _selectable;
+ private Reactor _reactor;
+
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
* Client code outside this module should use a {@link EngineFactory} instead
@@ -1438,14 +1443,17 @@ public class TransportImpl extends EndpointImpl
}
}
+ @Override
public void setIdleTimeout(int timeout) {
_localIdleTimeout = timeout;
}
+ @Override
public int getIdleTimeout() {
return _localIdleTimeout;
}
+ @Override
public int getRemoteIdleTimeout() {
return _remoteIdleTimeout;
}
@@ -1525,6 +1533,7 @@ public class TransportImpl extends EndpointImpl
_outputProcessor.close_head();
}
+ @Override
public boolean isClosed() {
int p = pending();
int c = capacity();
@@ -1588,4 +1597,20 @@ public class TransportImpl extends EndpointImpl
@Override
void localClose() {}
+
+ public void setSelectable(Selectable selectable) {
+ _selectable = selectable;
+ }
+
+ public Selectable getSelectable() {
+ return _selectable;
+ }
+
+ public void setReactor(Reactor reactor) {
+ _reactor = reactor;
+ }
+
+ public Reactor getReactor() {
+ return _reactor;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
new file mode 100644
index 0000000..02c5de2
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.impl.ReactorImpl;
+
+
+
+public interface Reactor {
+
+ public static final class Factory
+ {
+ public static Reactor create() throws IOException {
+ return new ReactorImpl();
+ }
+ }
+
+ public long mark();
+ public long now();
+ public void attach(Object attachment);
+ public Object attachment();
+ public long getTimeout();
+
+ public void setTimeout(long timeout);
+
+ public Handler getGlobalHandler();
+
+ public void setGlobalHandler(Handler handler);
+
+ public Handler getHandler();
+
+ public void setHandler(Handler handler);
+
+/* TODO
+ * pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
+166 assert(reactor);
+167 return reactor->io;
+168 }
+169
+
+ */
+
+ public Set<Selectable> children();
+
+ public Collector collector();
+
+
+ public Selectable selectable();
+
+
+
+ public void update(Selectable selectable);
+
+
+ void yield() ;
+
+ public boolean quiesced();
+
+ public boolean process();
+
+ public void wakeup() throws IOException;
+
+ public void start() ;
+
+ public void stop() ;
+
+ public void run();
+
+ // pn_reactor_schedule from reactor.c
+ public Task schedule(int delay, Handler handler);
+ // TODO: acceptor
+ // TODO: connection
+ // TODO: acceptorClose
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
new file mode 100644
index 0000000..c2b560f
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Transport;
+
+public interface Selectable {
+
+ public interface Callback {
+ void run(Selectable selectable);
+ }
+
+ public boolean isReading();
+
+ boolean isWriting();
+
+ long getDeadline() ;
+
+ void setReading(boolean reading) ;
+
+ void setWriting(boolean writing);
+
+ void setDeadline(long deadline) ;
+
+ public void onReadable(Callback runnable) ;
+
+ public void onWritable(Callback runnable);
+
+ public void onExpired(Callback runnable);
+
+ public void onError(Callback runnable);
+
+ public void onRelease(Callback runnable);
+
+ public void onFinalize(Callback runnable);
+
+ void readable() ;
+
+ void writeable() ;
+
+ void expired() ;
+
+ void error();
+
+ void release() ;
+
+ void _finalize() ;
+
+ // These are equivalent to the C code's set/get file descritor functions.
+ void setChannel(SelectableChannel channel) ;
+
+ public SelectableChannel getChannel() ;
+
+ void setAttachment(Object attachment) ;
+
+ Object getAttachment() ;
+
+ boolean isRegistered() ;
+
+ void setRegistered(boolean registered) ;
+
+ void setCollector(final Collector collector) ;
+
+ public Reactor getReactor() ;
+ public void terminate() ;
+
+ public enum RecordKeyType {
+ PNI_TERMINATED
+ }
+
+ public enum RecordValueType {
+ PN_VOID
+ }
+
+ public boolean hasRecord(RecordKeyType type);
+
+ public void setRecord(RecordKeyType key, RecordValueType value) ;
+ public boolean isTerminal();
+
+
+ public Transport getTransport() ;
+
+ public void setTransport(Transport transport) ;
+
+ public void setReactor(Reactor reactor) ;
+
+ public void add(Handler handler);
+
+ public Handler getHandler() ;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
new file mode 100644
index 0000000..12188e2
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface Selector {
+
+ public void add(Selectable selectable) throws IOException ;
+
+ public void update(Selectable selectable);
+
+ public void remove(Selectable selectable) ;
+
+ public void select(long timeout) throws IOException ;
+
+ public Iterator<Selectable> readable() ;
+
+ public Iterator<Selectable> writeable() ;
+
+ public Iterator<Selectable> expired() ;
+ public Iterator<Selectable> error() ;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
new file mode 100644
index 0000000..88031c5
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import org.apache.qpid.proton.engine.Handler;
+
+public interface Task {
+
+ public long deadline();
+ public void setReactor(Reactor reactor) ;
+ public Reactor getReactor();
+ public void setHandler(Handler handler) ;
+ public Handler getHandler() ;
+ public Object getAttachment() ;
+ public void setAttachment(Object attachment) ;
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org