You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:06 UTC

[05/38] qpid-proton git commit: PROTON-881: Add a Send example, and supporting changes in the reactor.

PROTON-881: Add a Send example, and supporting changes in the reactor.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd09de66
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd09de66
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd09de66

Branch: refs/heads/master
Commit: cd09de66362580f0c5ceab464d71c7ad4300b517
Parents: 88df5e7
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Tue Apr 21 16:23:12 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:23:47 2015 +0100

----------------------------------------------------------------------
 .../qpid/proton/example/reactor/Send.java       | 142 +++++++++++++++++++
 .../apache/qpid/proton/engine/Connection.java   |   7 +-
 .../qpid/proton/engine/impl/ConnectionImpl.java |  32 ++++-
 .../qpid/proton/engine/impl/EventImpl.java      |  12 +-
 .../qpid/proton/engine/impl/TransportImpl.java  |   8 +-
 .../apache/qpid/proton/reactor/Handshaker.java  |  72 ++++++++++
 .../org/apache/qpid/proton/reactor/Reactor.java |   5 +-
 .../qpid/proton/reactor/ReactorChild.java       |  27 ++++
 .../apache/qpid/proton/reactor/Selectable.java  |   2 +-
 .../qpid/proton/reactor/impl/IOHandler.java     |  19 ++-
 .../qpid/proton/reactor/impl/ReactorImpl.java   |  21 ++-
 .../qpid/proton/reactor/impl/SelectorImpl.java  |  30 ++--
 12 files changed, 342 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
new file mode 100644
index 0000000..5cd5811
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.Handshaker;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// This is a send in terms of low level AMQP events.
+public class Send extends BaseHandler {
+
+    private class SendHandler extends BaseHandler {
+
+        private final String hostname;
+        private final Message message;
+        private int nextTag = 0;
+
+        private SendHandler(String hostname, Message message) {
+            this.hostname = hostname;
+            this.message = message;
+
+            // Add a child handler that performs some default handshaking
+            // behaviour.
+            add(new Handshaker());
+        }
+
+        @Override
+        public void onConnectionInit(Event event) {
+            Connection conn = event.getConnection();
+            conn.setHostname(hostname);
+
+            // Every session or link could have their own handler(s) if we
+            // wanted simply by adding the handler to the given session
+            // or link
+            Session ssn = conn.session();
+
+            // If a link doesn't have an event handler, the events go to
+            // its parent session. If the session doesn't have a handler
+            // the events go to its parent connection. If the connection
+            // doesn't have a handler, the events go to the reactor.
+            Sender snd = ssn.sender("sender");
+            conn.open();
+            ssn.open();
+            snd.open();
+        }
+
+        @Override
+        public void onLinkFlow(Event event) {
+            Sender snd = (Sender)event.getLink();
+            if (snd.getCredit() > 0 && message != null) {
+                byte[] msgData = new byte[1024];
+                int length;
+                while(true) {
+                    try {
+                        length = message.encode(msgData, 0, msgData.length);
+                        break;
+                    } catch(BufferOverflowException e) {
+                        msgData = new byte[msgData.length * 2];
+                    }
+                }
+                byte[] tag = String.valueOf(nextTag++).getBytes();
+                Delivery dlv = snd.delivery(tag);
+                snd.send(msgData, 0, length);
+                dlv.settle();
+                snd.advance();
+                snd.close();
+                snd.getSession().close();
+                snd.getSession().getConnection().close();
+            }
+        }
+
+        @Override
+        public void onTransportError(Event event) {
+            ErrorCondition condition = event.getTransport().getCondition();
+            if (condition != null) {
+                System.err.println("Error: " + condition.getDescription());
+            } else {
+                System.err.println("Error (no description returned).");
+            }
+        }
+    }
+
+    private final String hostname;
+    private final Message message;
+
+    private Send(String hostname, String content) {
+        this.hostname = hostname;
+        message = Proton.message();
+        message.setBody(new AmqpValue(content));
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        // You can use the connection method to create AMQP connections.
+
+        // This connection's handler is the SendHandler object. All the events
+        // for this connection will go to the SendHandler object instead of
+        // going to the reactor. If you were to omit the SendHandler object,
+        // all the events would go to the reactor.
+        event.getReactor().connection(new SendHandler(hostname, message));
+    }
+
+    public static void main(String[] args) throws IOException {
+        String hostname = args.length > 0 ? args[0] : "localhost";
+        String content = args.length > 1 ? args[1] : "Hello World!";
+
+        Reactor r = Proton.reactor(new Send(hostname, content));
+        r.run();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
index 5cb57a2..3dccbb1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.ReactorChild;
 
 
 /**
@@ -35,7 +37,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl;
  * {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)}
  * {@link #getWorkHead()} respectively.
  */
-public interface Connection extends HandlerEndpoint
+public interface Connection extends HandlerEndpoint, ReactorChild
 {
 
     public static final class Factory
@@ -110,12 +112,15 @@ public interface Connection extends HandlerEndpoint
 
     void setProperties(Map<Symbol,Object> properties);
 
+    @Override
     Object getContext();
 
+    @Override
     void setContext(Object context);
 
     void collect(Collector collector);
 
     Transport getTransport();
 
+    Reactor getReactor();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index eecc05e..b018a95 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -25,9 +25,16 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.ProtonJConnection;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.reactor.Reactor;
 
 public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection
 {
@@ -66,6 +73,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
 
     private Object _context;
     private CollectorImpl _collector;
+    private Reactor _reactor;
 
     private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0];
 
@@ -77,6 +85,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
     {
     }
 
+    @Override
     public SessionImpl session()
     {
         SessionImpl session = new SessionImpl(this);
@@ -154,6 +163,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
     }
 
 
+    @Override
     public Session sessionHead(final EnumSet<EndpointState> local, final EnumSet<EndpointState> remote)
     {
         if(_sessionHead == null)
@@ -168,6 +178,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         }
     }
 
+    @Override
     public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
     {
         if(_linkHead == null)
@@ -274,6 +285,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         }
     }
 
+    @Override
     public int getMaxChannels()
     {
         return _maxChannels;
@@ -290,6 +302,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         _localContainerId = localContainerId;
     }
 
+    @Override
     public DeliveryImpl getWorkHead()
     {
         return _workHead;
@@ -376,11 +389,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         return _properties;
     }
 
+    @Override
     public void setProperties(Map<Symbol, Object> properties)
     {
         _properties = properties;
     }
 
+    @Override
     public Map<Symbol, Object> getRemoteProperties()
     {
         return _remoteProperties;
@@ -391,6 +406,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         _remoteProperties = remoteProperties;
     }
 
+    @Override
     public String getHostname()
     {
         return _localHostname;
@@ -471,6 +487,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         _transport = transport;
     }
 
+    @Override
     public TransportImpl getTransport()
     {
         return _transport;
@@ -497,6 +514,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
             throw new UnsupportedOperationException();
         }
 
+        @Override
         public DeliveryImpl next()
         {
             DeliveryImpl next = _next;
@@ -588,16 +606,19 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
         }
     }
 
+    @Override
     public Object getContext()
     {
         return _context;
     }
 
+    @Override
     public void setContext(Object context)
     {
         _context = context;
     }
 
+    @Override
     public void collect(Collector collector)
     {
         _collector = (CollectorImpl) collector;
@@ -637,4 +658,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
     {
         put(Event.Type.CONNECTION_LOCAL_CLOSE, this);
     }
+
+    @Override
+    public Reactor getReactor() {
+        return _reactor;
+    }
+
+    public void setReactor(Reactor reactor) {
+        _reactor = reactor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
index 65a2000..6abec58 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
@@ -295,17 +295,13 @@ class EventImpl implements Event
         } else if (context instanceof Transport) {
             return ((TransportImpl)context).getReactor();
         } else if (context instanceof Delivery) {
-            Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport();
-            return ((TransportImpl)transport).getReactor();
+            return ((Delivery)context).getLink().getSession().getConnection().getReactor();
         } else if (context instanceof Link) {
-            Transport transport = ((Link)context).getSession().getConnection().getTransport();
-            return ((TransportImpl)transport).getReactor();
+            return ((Link)context).getSession().getConnection().getReactor();
         } else if (context instanceof Session) {
-            Transport transport = ((Session)context).getConnection().getTransport();
-            return ((TransportImpl)transport).getReactor();
+            return ((Session)context).getConnection().getReactor();
         } else if (context instanceof Connection) {
-            Transport transport = ((Connection)context).getTransport();
-            return ((TransportImpl)transport).getReactor();
+            return ((Connection)context).getReactor();
         } else if (context instanceof Selectable) {
             return ((Selectable)context).getReactor();
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index f4813cd..694e23b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -121,6 +121,7 @@ public class TransportImpl extends EndpointImpl
 
     private boolean postedHeadClosed = false;
     private boolean postedTailClosed = false;
+    private boolean postedTransportError = false;
 
     private int _localIdleTimeout = 0;
     private int _remoteIdleTimeout = 0;
@@ -591,7 +592,9 @@ public class TransportImpl extends EndpointImpl
                 session.incrementOutgoingBytes(-delta);
             }
 
-            getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
+            if (snd.getLocalState() != EndpointState.CLOSED) {
+                getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
+            }
         }
 
         if(wasDone && delivery.getLocalState() != null)
@@ -1319,8 +1322,9 @@ public class TransportImpl extends EndpointImpl
             }
             _head_closed = true;
         }
-        if (_condition != null) {
+        if (_condition != null && !postedTransportError) {
             put(Event.Type.TRANSPORT_ERROR, this);
+            postedTransportError = true;
         }
         if (!postedTailClosed) {
             put(Event.Type.TRANSPORT_TAIL_CLOSED, this);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
new file mode 100644
index 0000000..f9b670a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+
+public class Handshaker extends BaseHandler {
+
+    private void open(Endpoint endpoint) {
+        if (endpoint.getLocalState() == EndpointState.UNINITIALIZED) {
+            endpoint.open();
+        }
+    }
+
+    private void close(Endpoint endpoint) {
+        if (endpoint.getLocalState() != EndpointState.CLOSED) {
+            endpoint.close();
+        }
+    }
+
+    @Override
+    public void onConnectionRemoteOpen(Event event) {
+        open(event.getConnection());
+    }
+
+    @Override
+    public void onSessionRemoteOpen(Event event) {
+        open(event.getSession());
+    }
+
+    @Override
+    public void onLinkRemoteOpen(Event event) {
+        open(event.getLink());
+    }
+
+    @Override
+    public void onConnectionRemoteClose(Event event) {
+        close(event.getConnection());
+    }
+
+    @Override
+    public void onSessionRemoteClose(Event event) {
+        close(event.getSession());
+    }
+
+    @Override
+    public void onLinkRemoteClose(Event event) {
+        close(event.getLink());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 02c5de2..0c56a48 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Set;
 
 import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.reactor.impl.ReactorImpl;
 
@@ -64,7 +65,7 @@ public interface Reactor {
 
  */
 
-    public Set<Selectable> children();
+    public Set<ReactorChild> children();
 
     public Collector collector();
 
@@ -93,7 +94,7 @@ public interface Reactor {
     // pn_reactor_schedule from reactor.c
     public Task schedule(int delay, Handler handler);
     // TODO: acceptor
-    // TODO: connection
     // TODO: acceptorClose
 
+    Connection connection(Handler handler);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
new file mode 100644
index 0000000..d020d1a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+// Tagging interface used to identify classes that can be a child of a reactor.
+public interface ReactorChild {
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
index c2b560f..7bb64c7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -27,7 +27,7 @@ import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.Transport;
 
-public interface Selectable {
+public interface Selectable extends ReactorChild {
 
     public interface Callback {
         void run(Selectable selectable);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index 08aca1f..ee988ee 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -22,6 +22,7 @@
 package org.apache.qpid.proton.reactor.impl;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
@@ -96,16 +97,20 @@ public class IOHandler extends BaseHandler {
             hostname = hostname.substring(0, colonIndex);
         }
 
-        Transport transport = event.getTransport();
+        Transport transport = event.getConnection().getTransport();
         Socket socket = null;   // TODO: null is our equivalent of PN_INVALID_SOCKET
         try {
-            socket = new Socket(hostname, port);
+            SocketChannel socketChannel = SocketChannel.open();
+            socketChannel.connect(new InetSocketAddress(hostname, port));
+            socket = socketChannel.socket();
         } catch(IOException ioException) {
-            ErrorCondition condition = transport.getCondition();
+            ErrorCondition condition = new ErrorCondition();
             condition.setCondition(Symbol.getSymbol("proton:io"));
             condition.setDescription(ioException.getMessage());
+            transport.setCondition(condition);
             transport.close_tail();
             transport.close_head();
+            transport.pop(transport.pending());   // TODO: force generation of TRANSPORT_HEAD_CLOSE (not in C code)
         }
         selectableTransport(reactor, socket, transport);
     }
@@ -170,9 +175,10 @@ public class IOHandler extends BaseHandler {
                         transport.process();
                     }
                 } catch (IOException e) {
-                    ErrorCondition condition = transport.getCondition();
+                    ErrorCondition condition = new ErrorCondition();
                     condition.setCondition(Symbol.getSymbol("proton:io"));
                     condition.setDescription(e.getMessage());
+                    transport.setCondition(condition);
                     transport.close_tail();
                 }
             }
@@ -201,9 +207,10 @@ public class IOHandler extends BaseHandler {
                         transport.pop(n);
                     }
                 } catch(IOException ioException) {
-                    ErrorCondition condition = transport.getCondition();
+                    ErrorCondition condition = new ErrorCondition();
                     condition.setCondition(Symbol.getSymbol("proton:io"));
                     condition.setDescription(ioException.getMessage());
+                    transport.setCondition(condition);
                     transport.close_head();
                 }
             }
@@ -259,7 +266,7 @@ public class IOHandler extends BaseHandler {
     private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) {
         // TODO: this code needs to be able to deal with a null socket (this is our equivalent of PN_INVALID_SOCKET)
         Selectable selectable = reactor.selectable();
-        selectable.setChannel(socket.getChannel());
+        selectable.setChannel(socket != null ? socket.getChannel() : null);
         selectable.onReadable(new ConnectionReadable());
         selectable.onWritable(new ConnectionWritable());
         selectable.onError(new ConnectionError());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 5072958..0a7f84d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -30,12 +30,15 @@ import java.util.Set;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.BaseHandler;
 import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl;
 import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.ReactorChild;
 import org.apache.qpid.proton.reactor.Selectable;
 import org.apache.qpid.proton.reactor.Selectable.Callback;
 import org.apache.qpid.proton.reactor.Selectable.RecordKeyType;
@@ -68,7 +71,7 @@ public class ReactorImpl implements Reactor {
     private long timeout;
     private Handler global;
     private Handler handler;
-    private Set<Selectable> children;
+    private Set<ReactorChild> children;
     private int selectables;
     private boolean yield;
     private Selectable selectable;
@@ -109,7 +112,7 @@ public class ReactorImpl implements Reactor {
         collector = (CollectorImpl)Proton.collector();
         global = new IOHandler();
         handler = new BaseHandler();
-        children = new HashSet<Selectable>();
+        children = new HashSet<ReactorChild>();
         selectables = 0;
         timer = new Timer(collector);
         wakeup = Pipe.open();
@@ -182,7 +185,7 @@ public class ReactorImpl implements Reactor {
  */
 
     @Override
-    public Set<Selectable> children() {
+    public Set<ReactorChild> children() {
         return children;
     }
 
@@ -442,4 +445,16 @@ public class ReactorImpl implements Reactor {
     protected void setSelector(Selector selector) {
         this.selector = selector;
     }
+
+    // pn_reactor_connection from connection.c
+    @Override
+    public Connection connection(Handler handler) {
+        Connection connection = Proton.connection();
+        connection.add(handler);
+        connection.collect(collector);
+        children.add(connection);
+        ((ConnectionImpl)connection).setReactor(this);
+        // TODO: C code adds a reference back to the reactor from connection
+        return connection;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index c74853e..35a6555 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -44,27 +44,34 @@ public class SelectorImpl implements Selector {
 
     @Override
     public void add(Selectable selectable) throws IOException {
-        selectable.getChannel().configureBlocking(false);
-        SelectionKey key = selectable.getChannel().register(selector, 0);
-        key.attach(selectable);
+        // TODO: valid for selectable to have a 'null' channel - in this case it can only expire...
+        if (selectable.getChannel() != null) {
+            selectable.getChannel().configureBlocking(false);
+            SelectionKey key = selectable.getChannel().register(selector, 0);
+            key.attach(selectable);
+        }
         selectables.add(selectable);
         update(selectable);
     }
 
     @Override
     public void update(Selectable selectable) {
-        int interestedOps = 0;
-        if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
-        if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
-        SelectionKey key = selectable.getChannel().keyFor(selector);
-        key.interestOps(interestedOps);
+        if (selectable.getChannel() != null) {
+            int interestedOps = 0;
+            if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
+            if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
+            SelectionKey key = selectable.getChannel().keyFor(selector);
+            key.interestOps(interestedOps);
+        }
     }
 
     @Override
     public void remove(Selectable selectable) {
-        SelectionKey key = selectable.getChannel().keyFor(selector);
-        key.cancel();
-        key.attach(null);
+        if (selectable.getChannel() != null) {
+            SelectionKey key = selectable.getChannel().keyFor(selector);
+            key.cancel();
+            key.attach(null);
+        }
         selectables.remove(selectable);
     }
 
@@ -106,6 +113,7 @@ public class SelectorImpl implements Selector {
             if (key.isReadable()) readable.add(selectable);
             if (key.isWritable()) writeable.add(selectable);
         }
+        selector.selectedKeys().clear();
         for (Selectable selectable : selectables) {    // TODO: this is different to the C code which evaluates expiry at the point the selectable is iterated over.
             long deadline = selectable.getDeadline();
             if (deadline > 0 && awoken >= deadline) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org