You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/13 18:15:17 UTC
[2/5] qpid-proton-j git commit: PROTON-1385: remove redundant and
offset 'java' dirs from examples area
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/FlowController.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/FlowController.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/FlowController.java
new file mode 100644
index 0000000..d22a637
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/FlowController.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+
+/**
+ * FlowController
+ *
+ */
+
+public class FlowController extends BaseHandler
+{
+
+ final private int window;
+
+ public FlowController(int window) {
+ this.window = window;
+ }
+
+ private void topUp(Receiver rcv) {
+ int delta = window - rcv.getCredit();
+ rcv.flow(delta);
+ }
+
+ @Override
+ public void onLinkLocalOpen(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Receiver) {
+ topUp((Receiver) link);
+ }
+ }
+
+ @Override
+ public void onLinkRemoteOpen(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Receiver) {
+ topUp((Receiver) link);
+ }
+ }
+
+ @Override
+ public void onLinkFlow(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Receiver) {
+ topUp((Receiver) link);
+ }
+ }
+
+ @Override
+ public void onDelivery(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Receiver) {
+ topUp((Receiver) link);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Handshaker.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
new file mode 100644
index 0000000..c53d0f8
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Handshaker
+ *
+ */
+
+public class Handshaker extends BaseHandler
+{
+
+ @Override
+ public void onConnectionRemoteOpen(Event evt) {
+ Connection conn = evt.getConnection();
+ if (conn.getLocalState() == EndpointState.UNINITIALIZED) {
+ conn.open();
+ }
+ }
+
+ @Override
+ public void onSessionRemoteOpen(Event evt) {
+ Session ssn = evt.getSession();
+ if (ssn.getLocalState() == EndpointState.UNINITIALIZED) {
+ ssn.open();
+ }
+ }
+
+ @Override
+ public void onLinkRemoteOpen(Event evt) {
+ Link link = evt.getLink();
+ if (link.getLocalState() == EndpointState.UNINITIALIZED) {
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ link.open();
+ }
+ }
+
+ @Override
+ public void onConnectionRemoteClose(Event evt) {
+ Connection conn = evt.getConnection();
+ if (conn.getLocalState() != EndpointState.CLOSED) {
+ conn.close();
+ }
+ }
+
+ @Override
+ public void onSessionRemoteClose(Event evt) {
+ Session ssn = evt.getSession();
+ if (ssn.getLocalState() != EndpointState.CLOSED) {
+ ssn.close();
+ }
+ }
+
+ @Override
+ public void onLinkRemoteClose(Event evt) {
+ Link link = evt.getLink();
+ if (link.getLocalState() != EndpointState.CLOSED) {
+ link.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Message.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Message.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Message.java
new file mode 100644
index 0000000..b439f69
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Message.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * Message
+ *
+ */
+
+public class Message
+{
+ private final byte[] bytes;
+
+ /**
+ * These bytes are expected to be AMQP encoded.
+ */
+ public Message(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ private static final byte[] PREFIX = {(byte)0x00, (byte)0x53, (byte)0x77, (byte)0xb1};
+
+ private static byte[] encodeString(String string) {
+ byte[] utf8 = string.getBytes();
+ byte[] result = new byte[PREFIX.length + 4 + utf8.length];
+ ByteBuffer bbuf = ByteBuffer.wrap(result);
+ bbuf.put(PREFIX);
+ bbuf.putInt(utf8.length);
+ bbuf.put(utf8);
+ return result;
+ }
+
+ public Message(String string) {
+ // XXX: special case string encoding for now
+ this(encodeString(string));
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("Message(");
+ for (byte b : bytes) {
+ if (b >= 32 && b < 127) {
+ bld.append((char) b);
+ } else {
+ bld.append("\\x");
+ String hex = Integer.toHexString(0xFF & b);
+ if (hex.length() < 2) {
+ bld.append("0");
+ }
+ bld.append(hex);
+ }
+ }
+ bld.append(')');
+ return bld.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Pool.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Pool.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Pool.java
new file mode 100644
index 0000000..bb5bf86
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Pool.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Receiver;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Pool
+ *
+ */
+
+public class Pool
+{
+
+ final private Collector collector;
+ final private Map<String,Connection> connections;
+
+ final private LinkConstructor<Sender> outgoingConstructor = new LinkConstructor<Sender> () {
+ public Sender create(Session ssn, String remote, String local) {
+ return newOutgoing(ssn, remote, local);
+ }
+ };
+ final private LinkConstructor<Receiver> incomingConstructor = new LinkConstructor<Receiver> () {
+ public Receiver create(Session ssn, String remote, String local) {
+ return newIncoming(ssn, remote, local);
+ }
+ };
+
+ final private LinkResolver<Sender> outgoingResolver;
+ final private LinkResolver<Receiver> incomingResolver;
+
+ public Pool(Collector collector, final Router router) {
+ this.collector = collector;
+ connections = new HashMap<String,Connection>();
+
+ if (router != null) {
+ outgoingResolver = new LinkResolver<Sender>() {
+ public Sender resolve(String address) {
+ return router.getOutgoing(address).choose();
+ }
+ };
+ incomingResolver = new LinkResolver<Receiver>() {
+ public Receiver resolve(String address) {
+ return router.getIncoming(address).choose();
+ }
+ };
+ } else {
+ outgoingResolver = new LinkResolver<Sender>() {
+ public Sender resolve(String address) { return null; }
+ };
+ incomingResolver = new LinkResolver<Receiver>() {
+ public Receiver resolve(String address) { return null; }
+ };
+ }
+ }
+
+ public Pool(Collector collector) {
+ this(collector, null);
+ }
+
+ private <T extends Link> T resolve(String remote, String local,
+ LinkResolver<T> resolver,
+ LinkConstructor<T> constructor) {
+ String host = remote.substring(2).split("/", 2)[0];
+ T link = resolver.resolve(remote);
+ if (link == null) {
+ Connection conn = connections.get(host);
+ if (conn == null) {
+ conn = Connection.Factory.create();
+ conn.collect(collector);
+ conn.setHostname(host);
+ conn.open();
+ connections.put(host, conn);
+ }
+
+ Session ssn = conn.session();
+ ssn.open();
+
+ link = constructor.create(ssn, remote, local);
+ link.open();
+ }
+ return link;
+ }
+
+ public Sender outgoing(String target, String source) {
+ return resolve(target, source, outgoingResolver, outgoingConstructor);
+ }
+
+ public Receiver incoming(String source, String target) {
+ return resolve(source, target, incomingResolver, incomingConstructor);
+ }
+
+ public Sender newOutgoing(Session ssn, String remote, String local) {
+ Sender snd = ssn.sender(String.format("%s-%s", local, remote));
+ Source src = new Source();
+ src.setAddress(local);
+ snd.setSource(src);
+ Target tgt = new Target();
+ tgt.setAddress(remote);
+ snd.setTarget(tgt);
+ return snd;
+ }
+
+ public Receiver newIncoming(Session ssn, String remote, String local) {
+ Receiver rcv = ssn.receiver(String.format("%s-%s", remote, local));
+ Source src = new Source();
+ src.setAddress(remote);
+ rcv.setSource(src);
+ Target tgt = new Target();
+ tgt.setAddress(remote);
+ rcv.setTarget(tgt);
+ return rcv;
+ }
+
+ public static interface LinkConstructor<T extends Link> {
+ T create(Session session, String remote, String local);
+ }
+
+ public static interface LinkResolver<T extends Link> {
+ T resolve(String remote);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Router.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Router.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Router.java
new file mode 100644
index 0000000..873f16c
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Router.java
@@ -0,0 +1,191 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.amqp.transport.Source;
+import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+/**
+ * Router
+ *
+ */
+
+public class Router extends BaseHandler
+{
+
+ public static class Routes<T extends Link> {
+
+ List<T> routes = new ArrayList<T>();
+
+ void add(T route) {
+ routes.add(route);
+ }
+
+ void remove(T route) {
+ routes.remove(route);
+ }
+
+ int size() {
+ return routes.size();
+ }
+
+ public T choose() {
+ if (routes.isEmpty()) { return null; }
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+ int idx = rand.nextInt(0, routes.size());
+ return routes.get(idx);
+ }
+
+ }
+
+ private static final Routes<Sender> EMPTY_OUT = new Routes<Sender>();
+ private static final Routes<Receiver> EMPTY_IN = new Routes<Receiver>();
+
+ final private Map<String,Routes<Sender>> outgoing = new HashMap<String,Routes<Sender>>();
+ final private Map<String,Routes<Receiver>> incoming = new HashMap<String,Routes<Receiver>>();
+
+ public Router() {}
+
+ private String getAddress(Source source) {
+ if (source == null) {
+ return null;
+ } else {
+ return source.getAddress();
+ }
+ }
+
+ private String getAddress(Target target) {
+ if (target == null) {
+ return null;
+ } else {
+ return target.getAddress();
+ }
+ }
+
+ public String getAddress(Sender snd) {
+ String source = getAddress(snd.getSource());
+ String target = getAddress(snd.getTarget());
+ return source != null ? source : target;
+ }
+
+ public String getAddress(Receiver rcv) {
+ return getAddress(rcv.getTarget());
+ }
+
+ public Routes<Sender> getOutgoing(String address) {
+ Routes<Sender> routes = outgoing.get(address);
+ if (routes == null) { return EMPTY_OUT; }
+ return routes;
+ }
+
+ public Routes<Receiver> getIncoming(String address) {
+ Routes<Receiver> routes = incoming.get(address);
+ if (routes == null) { return EMPTY_IN; }
+ return routes;
+ }
+
+ private void add(Sender snd) {
+ String address = getAddress(snd);
+ Routes<Sender> routes = outgoing.get(address);
+ if (routes == null) {
+ routes = new Routes<Sender>();
+ outgoing.put(address, routes);
+ }
+ routes.add(snd);
+ }
+
+ private void remove(Sender snd) {
+ String address = getAddress(snd);
+ Routes<Sender> routes = outgoing.get(address);
+ if (routes != null) {
+ routes.remove(snd);
+ if (routes.size() == 0) {
+ outgoing.remove(address);
+ }
+ }
+ }
+
+ private void add(Receiver rcv) {
+ String address = getAddress(rcv);
+ Routes<Receiver> routes = incoming.get(address);
+ if (routes == null) {
+ routes = new Routes<Receiver>();
+ incoming.put(address, routes);
+ }
+ routes.add(rcv);
+ }
+
+ private void remove(Receiver rcv) {
+ String address = getAddress(rcv);
+ Routes<Receiver> routes = incoming.get(address);
+ if (routes != null) {
+ routes.remove(rcv);
+ if (routes.size() == 0) {
+ incoming.remove(address);
+ }
+ }
+ }
+
+ private void add(Link link) {
+ if (link instanceof Sender) {
+ add((Sender) link);
+ } else {
+ add((Receiver) link);
+ }
+ }
+
+ private void remove(Link link) {
+ if (link instanceof Sender) {
+ remove((Sender) link);
+ } else {
+ remove((Receiver) link);
+ }
+ }
+
+ @Override
+ public void onLinkLocalOpen(Event evt) {
+ add(evt.getLink());
+ }
+
+ @Override
+ public void onLinkLocalClose(Event evt) {
+ remove(evt.getLink());
+ }
+
+ @Override
+ public void onLinkFinal(Event evt) {
+ remove(evt.getLink());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Server.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Server.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Server.java
new file mode 100644
index 0000000..3e00bd5
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Server.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+import java.io.IOException;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Server
+ *
+ */
+
+public class Server extends BaseHandler
+{
+
+ private class MessageStore {
+
+ Map<String,Deque<Message>> messages = new HashMap<String,Deque<Message>>();
+
+ void put(String address, Message message) {
+ Deque<Message> queue = messages.get(address);
+ if (queue == null) {
+ queue = new ArrayDeque<Message>();
+ messages.put(address, queue);
+ }
+ queue.add(message);
+ }
+
+ Message get(String address) {
+ Deque<Message> queue = messages.get(address);
+ if (queue == null) { return null; }
+ Message msg = queue.remove();
+ if (queue.isEmpty()) {
+ messages.remove(address);
+ }
+ return msg;
+ }
+
+ }
+
+ final private MessageStore messages = new MessageStore();
+ final private Router router;
+ private boolean quiet;
+ private int tag = 0;
+
+ public Server(Router router, boolean quiet) {
+ this.router = router;
+ this.quiet = quiet;
+ }
+
+ private byte[] nextTag() {
+ return String.format("%s", tag++).getBytes();
+ }
+
+ private int send(String address) {
+ return send(address, null);
+ }
+
+ private int send(String address, Sender snd) {
+ if (snd == null) {
+ Router.Routes<Sender> routes = router.getOutgoing(address);
+ snd = routes.choose();
+ if (snd == null) {
+ return 0;
+ }
+ }
+
+ int count = 0;
+ while (snd.getCredit() > 0 && snd.getQueued() < 1024) {
+ Message msg = messages.get(address);
+ if (msg == null) {
+ snd.drained();
+ return count;
+ }
+ Delivery dlv = snd.delivery(nextTag());
+ byte[] bytes = msg.getBytes();
+ snd.send(bytes, 0, bytes.length);
+ dlv.settle();
+ count++;
+ if (!quiet) {
+ System.out.println(String.format("Sent message(%s): %s", address, msg));
+ }
+ }
+
+ return count;
+ }
+
+ @Override
+ public void onLinkFlow(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Sender) {
+ Sender snd = (Sender) link;
+ send(router.getAddress(snd), snd);
+ }
+ }
+
+ @Override
+ public void onDelivery(Event evt) {
+ Delivery dlv = evt.getDelivery();
+ Link link = dlv.getLink();
+ if (link instanceof Sender) {
+ dlv.settle();
+ } else {
+ Receiver rcv = (Receiver) link;
+ if (!dlv.isPartial()) {
+ byte[] bytes = new byte[dlv.pending()];
+ rcv.recv(bytes, 0, bytes.length);
+ String address = router.getAddress(rcv);
+ Message message = new Message(bytes);
+ messages.put(address, message);
+ dlv.disposition(Accepted.getInstance());
+ dlv.settle();
+ if (!quiet) {
+ System.out.println(String.format("Got message(%s): %s", address, message));
+ }
+ send(address);
+ }
+ }
+ }
+
+ public static final void main(String[] argv) throws IOException {
+ List<String> switches = new ArrayList<String>();
+ List<String> args = new ArrayList<String>();
+ for (String s : argv) {
+ if (s.startsWith("-")) {
+ switches.add(s);
+ } else {
+ args.add(s);
+ }
+ }
+
+ boolean quiet = switches.contains("-q");
+ String host = !args.isEmpty() && !Character.isDigit(args.get(0).charAt(0)) ?
+ args.remove(0) : "localhost";
+ int port = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 5672;
+
+ Collector collector = Collector.Factory.create();
+ Router router = new Router();
+ Driver driver = new Driver(collector, new Handshaker(),
+ new FlowController(1024), router,
+ new Server(router, quiet));
+ driver.listen(host, port);
+ driver.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/engine/src/main/java/org/apache/qpid/proton/examples/Spout.java
----------------------------------------------------------------------
diff --git a/examples/engine/src/main/java/org/apache/qpid/proton/examples/Spout.java b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Spout.java
new file mode 100644
index 0000000..1d70aca
--- /dev/null
+++ b/examples/engine/src/main/java/org/apache/qpid/proton/examples/Spout.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+public class Spout extends BaseHandler
+{
+ private int count;
+ private int sent;
+ private int settled;
+ private boolean quiet;
+
+ public Spout(int count, boolean quiet) {
+ this.count = count;
+ this.quiet = quiet;
+ }
+
+ @Override
+ public void onLinkFlow(Event evt) {
+ Link link = evt.getLink();
+ if (link instanceof Sender) {
+ Sender sender = (Sender) link;
+ while ((sent < count) && sender.getCredit() > 0) {
+ Delivery dlv = sender.delivery(String.format("spout-%s", sent).getBytes());
+
+ Message msg = new Message(String.format("Hello World! [%s]", sent));
+ byte[] bytes = msg.getBytes();
+ sender.send(bytes, 0, bytes.length);
+ sender.advance();
+
+ if (!quiet) {
+ System.out.println(String.format("Sent %s to %s: %s", new String(dlv.getTag()),
+ sender.getTarget().getAddress(), msg));
+ }
+ sent++;
+ }
+ }
+ }
+
+ @Override
+ public void onDelivery(Event evt) {
+ Delivery dlv = evt.getDelivery();
+ if (dlv.remotelySettled()) {
+ if (!quiet) {
+ System.out.println(String.format("Settled %s: %s", new String(dlv.getTag()), dlv.getRemoteState()));
+ }
+ dlv.settle();
+ settled++;
+ }
+
+ if (settled >= count) {
+ dlv.getLink().getSession().getConnection().close();
+ }
+ }
+
+ @Override
+ public void onConnectionRemoteClose(Event evt) {
+ System.out.println("settled: " + settled);
+ }
+
+ public static void main(String[] argv) throws Exception {
+ List<String> switches = new ArrayList<String>();
+ List<String> args = new ArrayList<String>();
+ for (String s : argv) {
+ if (s.startsWith("-")) {
+ switches.add(s);
+ } else {
+ args.add(s);
+ }
+ }
+
+ boolean quiet = switches.contains("-q");
+ String address = !args.isEmpty() && args.get(0).startsWith("/") ?
+ args.remove(0) : "//localhost";
+ int count = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 1;
+
+ Collector collector = Collector.Factory.create();
+
+ Spout spout = new Spout(count, quiet);
+
+ Driver driver = new Driver(collector, spout);
+
+ Pool pool = new Pool(collector);
+ pool.outgoing(address, null);
+
+ driver.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/.gitignore
----------------------------------------------------------------------
diff --git a/examples/java/reactor/.gitignore b/examples/java/reactor/.gitignore
deleted file mode 100644
index 5e56e04..0000000
--- a/examples/java/reactor/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/bin
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/README.md
----------------------------------------------------------------------
diff --git a/examples/java/reactor/README.md b/examples/java/reactor/README.md
deleted file mode 100644
index c6b532f..0000000
--- a/examples/java/reactor/README.md
+++ /dev/null
@@ -1,55 +0,0 @@
-The Reactor API provides a means to dispatch events occurring across
-one or more connections. It can be used purely as a dispatch tool
-alongside your own I/O mechanism, however by default it is configured
-with a handler that provides I/O for you.
-
-When programming with the reactor it is important to understand the
-dispatch model used to process events. Every event is associated with
-a context object, i.e. the *target* object upon which the event
-occurred. These objects are contained either directly or indirectly
-within the Reactor:
-
- Delivery --> Link --> Session --> Connection --+
- |
- Task --+--> Reactor
- |
- Selectable --+
-
-
-Each event is dispatched first to a target-specific handler, and
-second to a global handler. The target-specific handler for an event
-is located by searching from the event context up through the
-hierarchy (terminating with the Reactor) and retrieving the most
-specific handler found.
-
-This means that any handler set on the Reactor could receive events
-targeting any object. For example if no handlers are associated with a
-Connection or any of its child objects, then the Reactor's handler
-will receive all the events for that Connection.
-
-Putting a handler on any child, e.g. a Connection or Session or Link
-will prevent any handlers set on the ancestors of that object from
-seeing any events targeted for that object or its children unless that
-handler specifically chooses to delegate those events up to the
-parent, e.g. by overriding onUnhandled and delegating.
-
-The global handler (used to dispatch all events after the
-target-specific handler is invoked) can be accessed and modified using
-Reactor.set/getGlobalHandler. This can be useful for a number of
-reasons, e.g. you could log all events by doing this:
-
- reactor.getGlobalHandler().add(new LoggerHandler());
-
-Where LoggerHandler does this:
-
- public void onUnhandled(Event evt) {
- System.out.println(evt);
- }
-
-The above trick is particularly useful for debugging.
-
-Handlers themselves can have child handlers which will automatically
-delegate the event to those children *after* dispatching the event to
-itself. The default global handler is what provides the default I/O
-behavior of the reactor. To use the reactor as a pure dispatch
-mechanism you can simply set the global handler to null.
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/reactor/pom.xml b/examples/java/reactor/pom.xml
deleted file mode 100644
index 3ca3ec3..0000000
--- a/examples/java/reactor/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-project</artifactId>
- <version>0.17.0-SNAPSHOT</version>
- <relativePath>../../..</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>proton-j-reactor-examples</artifactId>
- <name>proton-j-reactor-examples</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-j</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- </dependencies>
-
- <scm>
- <url>http://svn.apache.org/viewvc/qpid/proton/</url>
- </scm>
-</project>
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/run
----------------------------------------------------------------------
diff --git a/examples/java/reactor/run b/examples/java/reactor/run
deleted file mode 100755
index 51bd155..0000000
--- a/examples/java/reactor/run
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-CLASS=$1
-shift
-mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.example.reactor.${CLASS} -Dexec.args="$*"
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
deleted file mode 100644
index cb8ceca..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Pipe.SourceChannel;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-
-public class Cat extends BaseHandler {
-
- private class EchoHandler extends BaseHandler {
- @Override
- public void onSelectableInit(Event event) {
- Selectable selectable = event.getSelectable();
- // We can configure a selectable with any SelectableChannel we want.
- selectable.setChannel(channel);
- // Ask to be notified when the channel is readable
- selectable.setReading(true);
- event.getReactor().update(selectable);
- }
-
- @Override
- public void onSelectableReadable(Event event) {
- Selectable selectable = event.getSelectable();
-
- // The onSelectableReadable event tells us that there is data
- // to be read, or the end of stream has been reached.
- SourceChannel channel = (SourceChannel)selectable.getChannel();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- try {
- while(true) {
- int amount = channel.read(buffer);
- if (amount < 0) {
- selectable.terminate();
- selectable.getReactor().update(selectable);
- }
- if (amount <= 0) break;
- System.out.write(buffer.array(), 0, buffer.position());
- buffer.clear();
- }
- } catch(IOException ioException) {
- ioException.printStackTrace();
- selectable.terminate();
- selectable.getReactor().update(selectable);
- }
- }
- }
-
- private final SourceChannel channel;
-
- private Cat(SourceChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public void onReactorInit(Event event) {
- Reactor reactor = event.getReactor();
- Selectable selectable = reactor.selectable();
- setHandler(selectable, new EchoHandler());
- reactor.update(selectable);
- }
-
- public static void main(String[] args) throws IOException {
- if (args.length != 1) {
- System.err.println("Specify a file name as an argument.");
- System.exit(1);
- }
- FileInputStream inFile = new FileInputStream(args[0]);
- SourceChannel inChannel = EchoInputStreamWrapper.wrap(inFile);
- Reactor reactor = Proton.reactor(new Cat(inChannel));
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
deleted file mode 100644
index 9a5a0b4..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-// Let's try to modify our counter example. In addition to counting to
-// 10 in quarter second intervals, let's also print out a random number
-// every half second. This is not a super easy thing to express in a
-// purely sequential program, but not so difficult using events.
-public class CountRandomly extends BaseHandler {
-
- private long startTime;
- private CounterHandler counter;
-
- class CounterHandler extends BaseHandler {
- private final int limit;
- private int count;
-
- CounterHandler(int limit) {
- this.limit = limit;
- }
-
- @Override
- public void onTimerTask(Event event) {
- count += 1;
- System.out.println(count);
-
- if (!done()) {
- event.getReactor().schedule(250, this);
- }
- }
-
- // Provide a method to check for doneness
- private boolean done() {
- return count >= limit;
- }
- }
-
- @Override
- public void onReactorInit(Event event) {
- startTime = System.currentTimeMillis();
- System.out.println("Hello, World!");
-
- // Save the counter instance in an attribute so we can refer to
- // it later.
- counter = new CounterHandler(10);
- event.getReactor().schedule(250, counter);
-
- // Now schedule another event with a different handler. Note
- // that the timer tasks go to separate handlers, and they don't
- // interfere with each other.
- event.getReactor().schedule(500, this);
- }
-
- @Override
- public void onTimerTask(Event event) {
- // keep on shouting until we are done counting
- System.out.println("Yay, " + Math.round(Math.abs((Math.random() * 110) - 10)));
- if (!counter.done()) {
- event.getReactor().schedule(500, this);
- }
- }
-
- @Override
- public void onReactorFinal(Event event) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
- }
-
- public static void main(String[] args) throws IOException {
- // In HelloWorld.java we said the reactor exits when there are no more
- // events to process. While this is true, it's not actually complete.
- // The reactor exits when there are no more events to process and no
- // possibility of future events arising. For that reason the reactor
- // will keep running until there are no more scheduled events and then
- // exit.
- Reactor reactor = Proton.reactor(new CountRandomly());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
deleted file mode 100644
index b05685a..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-public class Counter extends BaseHandler {
-
- private long startTime;
-
- class CounterHandler extends BaseHandler {
- private final int limit;
- private int count;
-
- CounterHandler(int limit) {
- this.limit = limit;
- }
-
- @Override
- public void onTimerTask(Event event) {
- count += 1;
- System.out.println(count);
- if (count < limit) {
- // A recurring task can be accomplished by just scheduling
- // another event.
- event.getReactor().schedule(250, this);
- }
- }
- }
-
- @Override
- public void onReactorInit(Event event) {
- startTime = System.currentTimeMillis();
- System.out.println("Hello, World!");
-
- // Note that unlike the previous scheduling example, we pass in
- // a separate object for the handler. This means that the timer
- // event we just scheduled will not be seen by the Counter
- // implementation of BaseHandler as it is being handled by the
- // CounterHandler instance we create.
- event.getReactor().schedule(250, new CounterHandler(10));
- }
-
- @Override
- public void onReactorFinal(Event event) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
- }
-
- public static void main(String[] args) throws IOException {
- // In HelloWorld.java we said the reactor exits when there are no more
- // events to process. While this is true, it's not actually complete.
- // The reactor exits when there are no more events to process and no
- // possibility of future events arising. For that reason the reactor
- // will keep running until there are no more scheduled events and then
- // exit.
- Reactor reactor = Proton.reactor(new Counter());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
deleted file mode 100644
index 7b4e36f..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.reactor.Reactor;
-
-// Events know how to dispatch themselves to handlers. By combining
-// this with on_unhandled, you can provide a kind of inheritance
-/// between handlers using delegation.
-public class Delegates extends BaseHandler {
-
- private final Handler[] handlers;
-
- static class Hello extends BaseHandler {
- @Override
- public void onReactorInit(Event e) {
- System.out.println("Hello, World!");
- }
- }
-
- static class Goodbye extends BaseHandler {
- @Override
- public void onReactorFinal(Event e) {
- System.out.println("Goodbye, World!");
- }
- }
-
- public Delegates(Handler... handlers) {
- this.handlers = handlers;
- }
-
- @Override
- public void onUnhandled(Event event) {
- for (Handler handler : handlers) {
- event.dispatch(handler);
- }
- }
-
- public static void main(String[] args) throws IOException {
- Reactor reactor = Proton.reactor(new Delegates(new Hello(), new Goodbye()));
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
deleted file mode 100644
index 852bf8e..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Pipe.SourceChannel;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-
-public class Echo extends BaseHandler {
-
- private class EchoHandler extends BaseHandler {
- @Override
- public void onSelectableInit(Event event) {
- Selectable selectable = event.getSelectable();
- // We can configure a selectable with any SelectableChannel we want.
- selectable.setChannel(channel);
- // Ask to be notified when the channel is readable
- selectable.setReading(true);
- event.getReactor().update(selectable);
- }
-
- @Override
- public void onSelectableReadable(Event event) {
- Selectable selectable = event.getSelectable();
-
- // The onSelectableReadable event tells us that there is data
- // to be read, or the end of stream has been reached.
- SourceChannel channel = (SourceChannel)selectable.getChannel();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- try {
- while(true) {
- int amount = channel.read(buffer);
- if (amount < 0) {
- selectable.terminate();
- selectable.getReactor().update(selectable);
- }
- if (amount <= 0) break;
- System.out.write(buffer.array(), 0, buffer.position());
- buffer.clear();
- }
- } catch(IOException ioException) {
- ioException.printStackTrace();
- selectable.terminate();
- selectable.getReactor().update(selectable);
- }
- }
- }
-
- private final SourceChannel channel;
-
- private Echo(SourceChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public void onReactorInit(Event event) {
- // Every selectable is a possible source of future events. Our
- // selectable stays alive until it reads the end of stream
- // marker. This will keep the whole reactor running until we
- // type Control-D.
- System.out.println("Type whatever you want and then use Control-D to exit:");
- Reactor reactor = event.getReactor();
- Selectable selectable = reactor.selectable();
- setHandler(selectable, new EchoHandler());
- reactor.update(selectable);
- }
-
- public static void main(String[] args) throws IOException {
- SourceChannel inChannel = EchoInputStreamWrapper.wrap(System.in);
- Reactor reactor = Proton.reactor(new Echo(inChannel));
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
deleted file mode 100644
index 2e53d09..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Pipe;
-import java.nio.channels.Pipe.SinkChannel;
-import java.nio.channels.Pipe.SourceChannel;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class EchoInputStreamWrapper extends Thread {
-
- private final InputStream in;
- private final SinkChannel out;
- private final byte[] bufferBytes = new byte[1024];
- private final ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
- private final AtomicInteger idCounter = new AtomicInteger();
-
- private EchoInputStreamWrapper(InputStream in, SinkChannel out) {
- this.in = in;
- this.out = out;
- setName(getClass().getName() + "-" + idCounter.incrementAndGet());
- setDaemon(true);
- }
-
- @Override
- public void run() {
- try {
- while(true) {
- int amount = in.read(bufferBytes);
- if (amount < 0) break;
- buffer.position(0);
- buffer.limit(amount);
- out.write(buffer);
- }
- } catch(IOException ioException) {
- ioException.printStackTrace();
- } finally {
- try {
- out.close();
- } catch(IOException ioException) {
- ioException.printStackTrace();
- }
- }
- }
-
- public static SourceChannel wrap(InputStream in) throws IOException {
- Pipe pipe = Pipe.open();
- new EchoInputStreamWrapper(in, pipe.sink()).start();
- SourceChannel result = pipe.source();
- result.configureBlocking(false);
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
deleted file mode 100644
index ec56bd5..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-// Not every event goes to the reactor's event handler. If we have a
-// separate handler for something like a scheduled task, then those
-// events aren't logged by the logger associated with the reactor's
-// handler. Sometimes this is useful if you don't want to see them, but
-// sometimes you want the global picture.
-public class GlobalLogger extends BaseHandler {
-
- static class Logger extends BaseHandler {
- @Override
- public void onUnhandled(Event event) {
- System.out.println("LOG: " + event);
- }
- }
-
- static class Task extends BaseHandler {
- @Override
- public void onTimerTask(Event e) {
- System.out.println("Mission accomplished!");
- }
- }
-
- @Override
- public void onReactorInit(Event event) {
- System.out.println("Hello, World!");
- event.getReactor().schedule(0, new Task());
- }
-
- @Override
- public void onReactorFinal(Event e) {
- System.out.println("Goodbye, World!");
- }
-
- public static void main(String[] args) throws IOException {
- Reactor reactor = Proton.reactor(new GlobalLogger());
-
- // In addition to having a regular handler, the reactor also has a
- // global handler that sees every event. By adding the Logger to the
- // global handler instead of the regular handler, we can log every
- // single event that occurs in the system regardless of whether or not
- // there are specific handlers associated with the objects that are the
- // target of those events.
- reactor.getGlobalHandler().add(new Logger());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
deleted file mode 100644
index 6a69ba1..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-// So far the reactive hello-world doesn't look too different from a
-// regular old non-reactive hello-world. The onReactorInit method can
-// be used roughly as a 'main' method would. A program that only uses
-// that one event, however, isn't going to be very reactive. By using
-// other events, we can write a fully reactive program.
-public class GoodbyeWorld extends BaseHandler {
-
- // As before we handle the reactor init event.
- @Override
- public void onReactorInit(Event event) {
- System.out.println("Hello, World!");
- }
-
- // In addition to an initial event, the reactor also produces an
- // event when it is about to exit. This may not behave much
- // differently than just putting the goodbye print statement inside
- // onReactorInit, but as we grow our program, this piece of it
- // will always be what happens last, and will always happen
- // regardless of what other paths the main logic of our program
- // might take.
- @Override
- public void onReactorFinal(Event e) {
- System.out.println("Goodbye, World!");;
- }
-
- public static void main(String[] args) throws IOException {
- Reactor reactor = Proton.reactor(new GoodbyeWorld());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
deleted file mode 100644
index 39a36fb..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-/*
- * The proton reactor provides a general purpose event processing
- * library for writing reactive programs. A reactive program is defined
- * by a set of event handlers. An event handler is just any class or
- * object that extends the Handler interface. For convenience, a class
- * can extend BaseHandler and only handle the events that it cares to
- * implement methods for.
- */
-public class HelloWorld extends BaseHandler {
-
- // The reactor init event is produced by the reactor itself when it
- // starts.
- @Override
- public void onReactorInit(Event event) {
- System.out.println("Hello, World!");
- }
-
- public static void main(String[] args) throws IOException {
-
- // When you construct a reactor, you can give it a handler that
- // is used, by default, to receive events generated by the reactor.
- Reactor reactor = Proton.reactor(new HelloWorld());
-
- // When you call run, the reactor will process events. The reactor init
- // event is what kicks off everything else. When the reactor has no
- // more events to process, it exits.
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
deleted file mode 100644
index 73fbb87..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
+++ /dev/null
@@ -1,31 +0,0 @@
-The examples in this directory provide a basic introduction to the
-proton reactor API and are best viewed in the order presented below.
-
-The examples contain comments that explain things in a tutorial-style
-manner. At some point soon this content will be pulled out into a
-proper tutorial that references the relevant code snippets from these
-examples. Until then please bear with this clumsy style of
-presentation.
-
-This API is present in Java and Python as well. Most of these examples will
-transliterate into C in a fairly straightforward way.
-
- - HelloWorld.java
- - GoodbyeWorld.java
-
- - Scheduling.java
- - Counter.java
- - CountRandomly.java
-
- - Unhandled.java
- - ReactorLogger.java
- - GlobalLogger.java
- - Delegates.java
-
- - Handlers.java
-
- - Echo.java
- - Cat.java
-
- - Send.java
- - Recv.java
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
deleted file mode 100644
index 31c7511..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-public class ReactorLogger extends BaseHandler {
-
- public static class Logger extends BaseHandler {
- @Override
- public void onUnhandled(Event event) {
- System.out.println("LOG: " + event);
- }
- }
-
- @Override
- public void onReactorInit(Event e) {
- System.out.println("Hello, World!");
- }
-
- @Override
- public void onReactorFinal(Event e) {
- System.out.println("Goodbye, World!");
- }
-
- private static boolean loggingEnabled = false;
-
- public static void main(String[] args) throws IOException {
-
- // You can pass multiple handlers to a reactor when you construct it.
- // Each of these handlers will see every event the reactor sees. By
- // combining this with on_unhandled, you can log each event that goes
- // to the reactor.
- Reactor reactor = Proton.reactor(new ReactorLogger(), new Logger());
- reactor.run();
-
- // Note that if you wanted to add the logger later, you could also
- // write the above as below. All arguments to the reactor are just
- // added to the default handler for the reactor.
- reactor = Proton.reactor(new ReactorLogger());
- if (loggingEnabled)
- reactor.getHandler().add(new Logger());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
deleted file mode 100644
index 96a348a..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.reactor.FlowController;
-import org.apache.qpid.proton.reactor.Handshaker;
-import org.apache.qpid.proton.reactor.Reactor;
-
-public class Recv extends BaseHandler {
-
- private Recv() {
- add(new Handshaker());
- add(new FlowController());
- }
-
- @Override
- public void onReactorInit(Event event) {
- try {
- // Create an amqp acceptor.
- event.getReactor().acceptor("0.0.0.0", 5672);
-
- // There is an optional third argument to the Reactor.acceptor
- // call. Using it, we could supply a handler here that would
- // become the handler for all accepted connections. If we omit
- // it, the reactor simply inherits all the connection events.
- } catch(IOException ioException) {
- ioException.printStackTrace();
- }
- }
-
- @Override
- public void onDelivery(Event event) {
- Receiver recv = (Receiver)event.getLink();
- Delivery delivery = recv.current();
- if (delivery.isReadable() && !delivery.isPartial()) {
- int size = delivery.pending();
- byte[] buffer = new byte[size];
- int read = recv.recv(buffer, 0, buffer.length);
- recv.advance();
-
- Message msg = Proton.message();
- msg.decode(buffer, 0, read);
- System.out.println(((AmqpValue)msg.getBody()).getValue());
- }
- }
-
- public static void main(String[] args) throws IOException {
- Reactor r = Proton.reactor(new Recv());
- r.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
deleted file mode 100644
index 3aed27a..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Task;
-
-public class Scheduling extends BaseHandler {
-
- private long startTime;
-
- @Override
- public void onReactorInit(Event event) {
- startTime = System.currentTimeMillis();
- System.out.println("Hello, World!");
-
- // We can schedule a task event for some point in the future.
- // This will cause the reactor to stick around until it has a
- // chance to process the event.
-
- // The first argument is the delay. The second argument is the
- // handler for the event. We are just using self for now, but
- // we could pass in another object if we wanted.
- Task task = event.getReactor().schedule(1000, this);
-
- // We can ignore the task if we want to, but we can also use it
- // to pass stuff to the handler.
- task.attachments().set("key", String.class, "Yay");
- }
-
- @Override
- public void onTimerTask(Event event) {
- Task task = event.getTask();
- System.out.println(task.attachments().get("key", String.class) + " my task is complete!");
- }
-
- @Override
- public void onReactorFinal(Event e) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
- }
-
- public static void main(String[] args) throws IOException {
- Reactor reactor = Proton.reactor(new Scheduling());
- reactor.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
deleted file mode 100644
index 5978c45..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.reactor.Handshaker;
-import org.apache.qpid.proton.reactor.Reactor;
-
-// This is a send in terms of low level AMQP events.
-public class Send extends BaseHandler {
-
- private class SendHandler extends BaseHandler {
-
- private final Message message;
- private int nextTag = 0;
-
- private SendHandler(Message message) {
- this.message = message;
-
- // Add a child handler that performs some default handshaking
- // behaviour.
- add(new Handshaker());
- }
-
- @Override
- public void onConnectionInit(Event event) {
- Connection conn = event.getConnection();
-
- // Every session or link could have their own handler(s) if we
- // wanted simply by adding the handler to the given session
- // or link
- Session ssn = conn.session();
-
- // If a link doesn't have an event handler, the events go to
- // its parent session. If the session doesn't have a handler
- // the events go to its parent connection. If the connection
- // doesn't have a handler, the events go to the reactor.
- Sender snd = ssn.sender("sender");
- conn.open();
- ssn.open();
- snd.open();
- }
-
- @Override
- public void onLinkFlow(Event event) {
- Sender snd = (Sender)event.getLink();
- if (snd.getCredit() > 0) {
- byte[] msgData = new byte[1024];
- int length;
- while(true) {
- try {
- length = message.encode(msgData, 0, msgData.length);
- break;
- } catch(BufferOverflowException e) {
- msgData = new byte[msgData.length * 2];
- }
- }
- byte[] tag = String.valueOf(nextTag++).getBytes();
- Delivery dlv = snd.delivery(tag);
- snd.send(msgData, 0, length);
- dlv.settle();
- snd.advance();
- snd.close();
- snd.getSession().close();
- snd.getSession().getConnection().close();
- }
- }
-
- @Override
- public void onTransportError(Event event) {
- ErrorCondition condition = event.getTransport().getCondition();
- if (condition != null) {
- System.err.println("Error: " + condition.getDescription());
- } else {
- System.err.println("Error (no description returned).");
- }
- }
- }
-
- private final String host;
- private final int port;
- private final Message message;
-
- private Send(String host, int port, String content) {
- this.host = host;
- this.port = port;
- message = Proton.message();
- message.setBody(new AmqpValue(content));
- }
-
- @Override
- public void onReactorInit(Event event) {
- // You can use the connection method to create AMQP connections.
-
- // This connection's handler is the SendHandler object. All the events
- // for this connection will go to the SendHandler object instead of
- // going to the reactor. If you were to omit the SendHandler object,
- // all the events would go to the reactor.
- event.getReactor().connectionToHost(host, port, new SendHandler(message));
- }
-
- public static void main(String[] args) throws IOException {
- int port = 5672;
- String host = "localhost";
- if (args.length > 0) {
- String[] parts = args[0].split(":", 2);
- host = parts[0];
- if (parts.length > 1) {
- port = Integer.parseInt(parts[1]);
- }
- }
- String content = args.length > 1 ? args[1] : "Hello World!";
-
- Reactor r = Proton.reactor(new Send(host, port, content));
- r.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
deleted file mode 100644
index a3cc200..0000000
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.example.reactor;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.reactor.Reactor;
-
-public class Unhandled extends BaseHandler {
-
- // If an event occurs and its handler doesn't have an on_<event>
- // method, the reactor will attempt to call the on_unhandled method
- // if it exists. This can be useful not only for debugging, but for
- // logging and for delegating/inheritance.
- @Override
- public void onUnhandled(Event event) {
- System.out.println(event);
- }
-
- public static void main(String[] args) throws IOException {
- Reactor reactor = Proton.reactor(new Unhandled());
- reactor.run();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org