You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/05/12 21:24:08 UTC
[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-5183
https://issues.apache.org/jira/browse/AMQ-5183
Swithc to using Proton's Evet collector for processing engine state
changes. All tests passing locally with this change.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/38a86b47
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/38a86b47
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/38a86b47
Branch: refs/heads/trunk
Commit: 38a86b470f26e2f6ab10fdfc16486d897926b2ed
Parents: 1dd34a1
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 12 15:23:42 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 12 15:23:42 2014 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 121 ++++++++++---------
1 file changed, 65 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/38a86b47/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index a0560b4..02621fc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -78,16 +77,19 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.EngineFactory;
+import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
@@ -109,11 +111,6 @@ import org.slf4j.LoggerFactory;
class AmqpProtocolConverter implements IAmqpProtocolConverter {
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
- public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
- public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
- public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
- public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
- public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransport amqpTransport;
@@ -131,6 +128,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
Transport protonTransport = engineFactory.createTransport();
Connection protonConnection = engineFactory.createConnection();
MessageFactory messageFactory = messageFactoryLoader.loadFactory();
+ Collector eventCollector = new CollectorImpl();
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
@@ -145,6 +143,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.protonTransport.setMaxFrameSize(maxFrameSize);
this.protonTransport.bind(this.protonConnection);
+ this.protonConnection.collect(eventCollector);
updateTracer();
}
@@ -171,14 +170,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
-// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
done = true;
}
}
- // System.out.println("write done");
} catch (IOException e) {
amqpTransport.onException(e);
}
@@ -208,7 +205,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
AmqpHeader header = (AmqpHeader) command;
switch (header.getProtocolId()) {
case 0:
- // amqpTransport.sendToAmqp(new AmqpHeader());
break; // nothing to do..
case 3: // Client will be using SASL for auth..
sasl = protonTransport.sasl();
@@ -225,7 +221,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
public void onFrame(Buffer frame) throws Exception {
- // System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@@ -263,54 +258,30 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
- // Handle the amqp open..
- if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
- onConnectionOpen();
- }
-
- // Lets map amqp sessions to openwire sessions..
- Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
- while (session != null) {
- onSessionOpen(session);
- session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
- }
-
- Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
- while (link != null) {
- onLinkOpen(link);
- link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
- }
-
- Delivery delivery = protonConnection.getWorkHead();
- while (delivery != null) {
- AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
- if (listener != null) {
- listener.onDelivery(delivery);
+ Event event = null;
+ while ((event = eventCollector.peek()) != null) {
+ switch (event.getType()) {
+ case CONNECTION_REMOTE_STATE:
+ processConnectionEvent(event.getConnection());
+ break;
+ case SESSION_REMOTE_STATE:
+ processSessionEvent(event.getSession());
+ break;
+ case LINK_REMOTE_STATE:
+ processLinkEvent(event.getLink());
+ break;
+ case LINK_FLOW:
+ Link link = event.getLink();
+ ((AmqpDeliveryListener) link.getContext()).drainCheck();
+ break;
+ case DELIVERY:
+ processDelivery(event.getDelivery());
+ break;
+ default:
+ break;
}
- delivery = delivery.getWorkNext();
- }
- link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
- while (link != null) {
- ((AmqpDeliveryListener) link.getContext()).onClose();
- link.close();
- link = link.next(ACTIVE_STATE, CLOSED_STATE);
- }
-
- link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
- while (link != null) {
- ((AmqpDeliveryListener) link.getContext()).drainCheck();
- link = link.next(ACTIVE_STATE, ALL_STATES);
- }
-
- session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
- while (session != null) {
- // TODO - close links?
- onSessionClose(session);
- session = session.next(ACTIVE_STATE, CLOSED_STATE);
- }
- if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
- doClose();
+ eventCollector.pop();
}
} catch (Throwable e) {
@@ -321,6 +292,44 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
+ protected void processConnectionEvent(Connection connection) throws Exception {
+ EndpointState remoteState = connection.getRemoteState();
+ if (remoteState == EndpointState.ACTIVE) {
+ onConnectionOpen();
+ } else if (remoteState == EndpointState.CLOSED) {
+ doClose();
+ }
+ }
+
+ protected void processLinkEvent(Link link) throws Exception {
+ EndpointState remoteState = link.getRemoteState();
+ if (remoteState == EndpointState.ACTIVE) {
+ onLinkOpen(link);
+ } else if (remoteState == EndpointState.CLOSED) {
+ ((AmqpDeliveryListener) link.getContext()).onClose();
+ link.close();
+ }
+ }
+
+ protected void processSessionEvent(Session session) throws Exception {
+ EndpointState remoteState = session.getRemoteState();
+ if (remoteState == EndpointState.ACTIVE) {
+ onSessionOpen(session);
+ } else if (remoteState == EndpointState.CLOSED) {
+ // TODO - close links?
+ onSessionClose(session);
+ }
+ }
+
+ protected void processDelivery(Delivery delivery) throws Exception {
+ if (!delivery.isPartial()) {
+ AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+ if (listener != null) {
+ listener.onDelivery(delivery);
+ }
+ }
+ }
+
boolean closing = false;
boolean closedSocket = false;