You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:06 UTC
[05/38] qpid-proton git commit: PROTON-881: Add a Send example,
and supporting changes in the reactor.
PROTON-881: Add a Send example, and supporting changes in the reactor.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd09de66
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd09de66
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd09de66
Branch: refs/heads/master
Commit: cd09de66362580f0c5ceab464d71c7ad4300b517
Parents: 88df5e7
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Tue Apr 21 16:23:12 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:23:47 2015 +0100
----------------------------------------------------------------------
.../qpid/proton/example/reactor/Send.java | 142 +++++++++++++++++++
.../apache/qpid/proton/engine/Connection.java | 7 +-
.../qpid/proton/engine/impl/ConnectionImpl.java | 32 ++++-
.../qpid/proton/engine/impl/EventImpl.java | 12 +-
.../qpid/proton/engine/impl/TransportImpl.java | 8 +-
.../apache/qpid/proton/reactor/Handshaker.java | 72 ++++++++++
.../org/apache/qpid/proton/reactor/Reactor.java | 5 +-
.../qpid/proton/reactor/ReactorChild.java | 27 ++++
.../apache/qpid/proton/reactor/Selectable.java | 2 +-
.../qpid/proton/reactor/impl/IOHandler.java | 19 ++-
.../qpid/proton/reactor/impl/ReactorImpl.java | 21 ++-
.../qpid/proton/reactor/impl/SelectorImpl.java | 30 ++--
12 files changed, 342 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
new file mode 100644
index 0000000..5cd5811
--- /dev/null
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.Handshaker;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// This is a send in terms of low level AMQP events.
+public class Send extends BaseHandler {
+
+ private class SendHandler extends BaseHandler {
+
+ private final String hostname;
+ private final Message message;
+ private int nextTag = 0;
+
+ private SendHandler(String hostname, Message message) {
+ this.hostname = hostname;
+ this.message = message;
+
+ // Add a child handler that performs some default handshaking
+ // behaviour.
+ add(new Handshaker());
+ }
+
+ @Override
+ public void onConnectionInit(Event event) {
+ Connection conn = event.getConnection();
+ conn.setHostname(hostname);
+
+ // Every session or link could have their own handler(s) if we
+ // wanted simply by adding the handler to the given session
+ // or link
+ Session ssn = conn.session();
+
+ // If a link doesn't have an event handler, the events go to
+ // its parent session. If the session doesn't have a handler
+ // the events go to its parent connection. If the connection
+ // doesn't have a handler, the events go to the reactor.
+ Sender snd = ssn.sender("sender");
+ conn.open();
+ ssn.open();
+ snd.open();
+ }
+
+ @Override
+ public void onLinkFlow(Event event) {
+ Sender snd = (Sender)event.getLink();
+ if (snd.getCredit() > 0 && message != null) {
+ byte[] msgData = new byte[1024];
+ int length;
+ while(true) {
+ try {
+ length = message.encode(msgData, 0, msgData.length);
+ break;
+ } catch(BufferOverflowException e) {
+ msgData = new byte[msgData.length * 2];
+ }
+ }
+ byte[] tag = String.valueOf(nextTag++).getBytes();
+ Delivery dlv = snd.delivery(tag);
+ snd.send(msgData, 0, length);
+ dlv.settle();
+ snd.advance();
+ snd.close();
+ snd.getSession().close();
+ snd.getSession().getConnection().close();
+ }
+ }
+
+ @Override
+ public void onTransportError(Event event) {
+ ErrorCondition condition = event.getTransport().getCondition();
+ if (condition != null) {
+ System.err.println("Error: " + condition.getDescription());
+ } else {
+ System.err.println("Error (no description returned).");
+ }
+ }
+ }
+
+ private final String hostname;
+ private final Message message;
+
+ private Send(String hostname, String content) {
+ this.hostname = hostname;
+ message = Proton.message();
+ message.setBody(new AmqpValue(content));
+ }
+
+ @Override
+ public void onReactorInit(Event event) {
+ // You can use the connection method to create AMQP connections.
+
+ // This connection's handler is the SendHandler object. All the events
+ // for this connection will go to the SendHandler object instead of
+ // going to the reactor. If you were to omit the SendHandler object,
+ // all the events would go to the reactor.
+ event.getReactor().connection(new SendHandler(hostname, message));
+ }
+
+ public static void main(String[] args) throws IOException {
+ String hostname = args.length > 0 ? args[0] : "localhost";
+ String content = args.length > 1 ? args[1] : "Hello World!";
+
+ Reactor r = Proton.reactor(new Send(hostname, content));
+ r.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
index 5cb57a2..3dccbb1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.ReactorChild;
/**
@@ -35,7 +37,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl;
* {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)}
* {@link #getWorkHead()} respectively.
*/
-public interface Connection extends HandlerEndpoint
+public interface Connection extends HandlerEndpoint, ReactorChild
{
public static final class Factory
@@ -110,12 +112,15 @@ public interface Connection extends HandlerEndpoint
void setProperties(Map<Symbol,Object> properties);
+ @Override
Object getContext();
+ @Override
void setContext(Object context);
void collect(Collector collector);
Transport getTransport();
+ Reactor getReactor();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index eecc05e..b018a95 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -25,9 +25,16 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.ProtonJConnection;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.reactor.Reactor;
public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection
{
@@ -66,6 +73,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
private Object _context;
private CollectorImpl _collector;
+ private Reactor _reactor;
private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0];
@@ -77,6 +85,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
{
}
+ @Override
public SessionImpl session()
{
SessionImpl session = new SessionImpl(this);
@@ -154,6 +163,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
}
+ @Override
public Session sessionHead(final EnumSet<EndpointState> local, final EnumSet<EndpointState> remote)
{
if(_sessionHead == null)
@@ -168,6 +178,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
}
}
+ @Override
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
if(_linkHead == null)
@@ -274,6 +285,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
}
}
+ @Override
public int getMaxChannels()
{
return _maxChannels;
@@ -290,6 +302,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
_localContainerId = localContainerId;
}
+ @Override
public DeliveryImpl getWorkHead()
{
return _workHead;
@@ -376,11 +389,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
return _properties;
}
+ @Override
public void setProperties(Map<Symbol, Object> properties)
{
_properties = properties;
}
+ @Override
public Map<Symbol, Object> getRemoteProperties()
{
return _remoteProperties;
@@ -391,6 +406,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
_remoteProperties = remoteProperties;
}
+ @Override
public String getHostname()
{
return _localHostname;
@@ -471,6 +487,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
_transport = transport;
}
+ @Override
public TransportImpl getTransport()
{
return _transport;
@@ -497,6 +514,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
throw new UnsupportedOperationException();
}
+ @Override
public DeliveryImpl next()
{
DeliveryImpl next = _next;
@@ -588,16 +606,19 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
}
}
+ @Override
public Object getContext()
{
return _context;
}
+ @Override
public void setContext(Object context)
{
_context = context;
}
+ @Override
public void collect(Collector collector)
{
_collector = (CollectorImpl) collector;
@@ -637,4 +658,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec
{
put(Event.Type.CONNECTION_LOCAL_CLOSE, this);
}
+
+ @Override
+ public Reactor getReactor() {
+ return _reactor;
+ }
+
+ public void setReactor(Reactor reactor) {
+ _reactor = reactor;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
index 65a2000..6abec58 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
@@ -295,17 +295,13 @@ class EventImpl implements Event
} else if (context instanceof Transport) {
return ((TransportImpl)context).getReactor();
} else if (context instanceof Delivery) {
- Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport();
- return ((TransportImpl)transport).getReactor();
+ return ((Delivery)context).getLink().getSession().getConnection().getReactor();
} else if (context instanceof Link) {
- Transport transport = ((Link)context).getSession().getConnection().getTransport();
- return ((TransportImpl)transport).getReactor();
+ return ((Link)context).getSession().getConnection().getReactor();
} else if (context instanceof Session) {
- Transport transport = ((Session)context).getConnection().getTransport();
- return ((TransportImpl)transport).getReactor();
+ return ((Session)context).getConnection().getReactor();
} else if (context instanceof Connection) {
- Transport transport = ((Connection)context).getTransport();
- return ((TransportImpl)transport).getReactor();
+ return ((Connection)context).getReactor();
} else if (context instanceof Selectable) {
return ((Selectable)context).getReactor();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index f4813cd..694e23b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -121,6 +121,7 @@ public class TransportImpl extends EndpointImpl
private boolean postedHeadClosed = false;
private boolean postedTailClosed = false;
+ private boolean postedTransportError = false;
private int _localIdleTimeout = 0;
private int _remoteIdleTimeout = 0;
@@ -591,7 +592,9 @@ public class TransportImpl extends EndpointImpl
session.incrementOutgoingBytes(-delta);
}
- getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
+ if (snd.getLocalState() != EndpointState.CLOSED) {
+ getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
+ }
}
if(wasDone && delivery.getLocalState() != null)
@@ -1319,8 +1322,9 @@ public class TransportImpl extends EndpointImpl
}
_head_closed = true;
}
- if (_condition != null) {
+ if (_condition != null && !postedTransportError) {
put(Event.Type.TRANSPORT_ERROR, this);
+ postedTransportError = true;
}
if (!postedTailClosed) {
put(Event.Type.TRANSPORT_TAIL_CLOSED, this);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
new file mode 100644
index 0000000..f9b670a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+
+public class Handshaker extends BaseHandler {
+
+ private void open(Endpoint endpoint) {
+ if (endpoint.getLocalState() == EndpointState.UNINITIALIZED) {
+ endpoint.open();
+ }
+ }
+
+ private void close(Endpoint endpoint) {
+ if (endpoint.getLocalState() != EndpointState.CLOSED) {
+ endpoint.close();
+ }
+ }
+
+ @Override
+ public void onConnectionRemoteOpen(Event event) {
+ open(event.getConnection());
+ }
+
+ @Override
+ public void onSessionRemoteOpen(Event event) {
+ open(event.getSession());
+ }
+
+ @Override
+ public void onLinkRemoteOpen(Event event) {
+ open(event.getLink());
+ }
+
+ @Override
+ public void onConnectionRemoteClose(Event event) {
+ close(event.getConnection());
+ }
+
+ @Override
+ public void onSessionRemoteClose(Event event) {
+ close(event.getSession());
+ }
+
+ @Override
+ public void onLinkRemoteClose(Event event) {
+ close(event.getLink());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 02c5de2..0c56a48 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Set;
import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
@@ -64,7 +65,7 @@ public interface Reactor {
*/
- public Set<Selectable> children();
+ public Set<ReactorChild> children();
public Collector collector();
@@ -93,7 +94,7 @@ public interface Reactor {
// pn_reactor_schedule from reactor.c
public Task schedule(int delay, Handler handler);
// TODO: acceptor
- // TODO: connection
// TODO: acceptorClose
+ Connection connection(Handler handler);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
new file mode 100644
index 0000000..d020d1a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor;
+
+// Tagging interface used to identify classes that can be a child of a reactor.
+public interface ReactorChild {
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
index c2b560f..7bb64c7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -27,7 +27,7 @@ import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Transport;
-public interface Selectable {
+public interface Selectable extends ReactorChild {
public interface Callback {
void run(Selectable selectable);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index 08aca1f..ee988ee 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -22,6 +22,7 @@
package org.apache.qpid.proton.reactor.impl;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
@@ -96,16 +97,20 @@ public class IOHandler extends BaseHandler {
hostname = hostname.substring(0, colonIndex);
}
- Transport transport = event.getTransport();
+ Transport transport = event.getConnection().getTransport();
Socket socket = null; // TODO: null is our equivalent of PN_INVALID_SOCKET
try {
- socket = new Socket(hostname, port);
+ SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.connect(new InetSocketAddress(hostname, port));
+ socket = socketChannel.socket();
} catch(IOException ioException) {
- ErrorCondition condition = transport.getCondition();
+ ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.getSymbol("proton:io"));
condition.setDescription(ioException.getMessage());
+ transport.setCondition(condition);
transport.close_tail();
transport.close_head();
+ transport.pop(transport.pending()); // TODO: force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
selectableTransport(reactor, socket, transport);
}
@@ -170,9 +175,10 @@ public class IOHandler extends BaseHandler {
transport.process();
}
} catch (IOException e) {
- ErrorCondition condition = transport.getCondition();
+ ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.getSymbol("proton:io"));
condition.setDescription(e.getMessage());
+ transport.setCondition(condition);
transport.close_tail();
}
}
@@ -201,9 +207,10 @@ public class IOHandler extends BaseHandler {
transport.pop(n);
}
} catch(IOException ioException) {
- ErrorCondition condition = transport.getCondition();
+ ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.getSymbol("proton:io"));
condition.setDescription(ioException.getMessage());
+ transport.setCondition(condition);
transport.close_head();
}
}
@@ -259,7 +266,7 @@ public class IOHandler extends BaseHandler {
private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) {
// TODO: this code needs to be able to deal with a null socket (this is our equivalent of PN_INVALID_SOCKET)
Selectable selectable = reactor.selectable();
- selectable.setChannel(socket.getChannel());
+ selectable.setChannel(socket != null ? socket.getChannel() : null);
selectable.onReadable(new ConnectionReadable());
selectable.onWritable(new ConnectionWritable());
selectable.onError(new ConnectionError());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 5072958..0a7f84d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -30,12 +30,15 @@ import java.util.Set;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl;
import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.ReactorChild;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selectable.RecordKeyType;
@@ -68,7 +71,7 @@ public class ReactorImpl implements Reactor {
private long timeout;
private Handler global;
private Handler handler;
- private Set<Selectable> children;
+ private Set<ReactorChild> children;
private int selectables;
private boolean yield;
private Selectable selectable;
@@ -109,7 +112,7 @@ public class ReactorImpl implements Reactor {
collector = (CollectorImpl)Proton.collector();
global = new IOHandler();
handler = new BaseHandler();
- children = new HashSet<Selectable>();
+ children = new HashSet<ReactorChild>();
selectables = 0;
timer = new Timer(collector);
wakeup = Pipe.open();
@@ -182,7 +185,7 @@ public class ReactorImpl implements Reactor {
*/
@Override
- public Set<Selectable> children() {
+ public Set<ReactorChild> children() {
return children;
}
@@ -442,4 +445,16 @@ public class ReactorImpl implements Reactor {
protected void setSelector(Selector selector) {
this.selector = selector;
}
+
+ // pn_reactor_connection from connection.c
+ @Override
+ public Connection connection(Handler handler) {
+ Connection connection = Proton.connection();
+ connection.add(handler);
+ connection.collect(collector);
+ children.add(connection);
+ ((ConnectionImpl)connection).setReactor(this);
+ // TODO: C code adds a reference back to the reactor from connection
+ return connection;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index c74853e..35a6555 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -44,27 +44,34 @@ public class SelectorImpl implements Selector {
@Override
public void add(Selectable selectable) throws IOException {
- selectable.getChannel().configureBlocking(false);
- SelectionKey key = selectable.getChannel().register(selector, 0);
- key.attach(selectable);
+ // TODO: valid for selectable to have a 'null' channel - in this case it can only expire...
+ if (selectable.getChannel() != null) {
+ selectable.getChannel().configureBlocking(false);
+ SelectionKey key = selectable.getChannel().register(selector, 0);
+ key.attach(selectable);
+ }
selectables.add(selectable);
update(selectable);
}
@Override
public void update(Selectable selectable) {
- int interestedOps = 0;
- if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
- if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
- SelectionKey key = selectable.getChannel().keyFor(selector);
- key.interestOps(interestedOps);
+ if (selectable.getChannel() != null) {
+ int interestedOps = 0;
+ if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
+ if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
+ SelectionKey key = selectable.getChannel().keyFor(selector);
+ key.interestOps(interestedOps);
+ }
}
@Override
public void remove(Selectable selectable) {
- SelectionKey key = selectable.getChannel().keyFor(selector);
- key.cancel();
- key.attach(null);
+ if (selectable.getChannel() != null) {
+ SelectionKey key = selectable.getChannel().keyFor(selector);
+ key.cancel();
+ key.attach(null);
+ }
selectables.remove(selectable);
}
@@ -106,6 +113,7 @@ public class SelectorImpl implements Selector {
if (key.isReadable()) readable.add(selectable);
if (key.isWritable()) writeable.add(selectable);
}
+ selector.selectedKeys().clear();
for (Selectable selectable : selectables) { // TODO: this is different to the C code which evaluates expiry at the point the selectable is iterated over.
long deadline = selectable.getDeadline();
if (deadline > 0 && awoken >= deadline) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org