You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/05/12 21:24:08 UTC

[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-5183

https://issues.apache.org/jira/browse/AMQ-5183

Swithc to using Proton's Evet collector for processing engine state
changes.  All tests passing locally with this change.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/38a86b47
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/38a86b47
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/38a86b47

Branch: refs/heads/trunk
Commit: 38a86b470f26e2f6ab10fdfc16486d897926b2ed
Parents: 1dd34a1
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 12 15:23:42 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 12 15:23:42 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 121 ++++++++++---------
 1 file changed, 65 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/38a86b47/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index a0560b4..02621fc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -78,16 +77,19 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.EngineFactory;
+import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
 import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
@@ -109,11 +111,6 @@ import org.slf4j.LoggerFactory;
 class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
-    public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
-    public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
-    public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
-    public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
-    public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
     static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private final AmqpTransport amqpTransport;
@@ -131,6 +128,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     Transport protonTransport = engineFactory.createTransport();
     Connection protonConnection = engineFactory.createConnection();
     MessageFactory messageFactory = messageFactoryLoader.loadFactory();
+    Collector eventCollector = new CollectorImpl();
 
     public AmqpProtocolConverter(AmqpTransport transport) {
         this.amqpTransport = transport;
@@ -145,6 +143,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         this.protonTransport.setMaxFrameSize(maxFrameSize);
         this.protonTransport.bind(this.protonConnection);
+        this.protonConnection.collect(eventCollector);
         updateTracer();
     }
 
@@ -171,14 +170,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             while (!done) {
                 ByteBuffer toWrite = protonTransport.getOutputBuffer();
                 if (toWrite != null && toWrite.hasRemaining()) {
-//                  // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
                     amqpTransport.sendToAmqp(toWrite);
                     protonTransport.outputConsumed();
                 } else {
                     done = true;
                 }
             }
-            // System.out.println("write done");
         } catch (IOException e) {
             amqpTransport.onException(e);
         }
@@ -208,7 +205,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             AmqpHeader header = (AmqpHeader) command;
             switch (header.getProtocolId()) {
                 case 0:
-                    // amqpTransport.sendToAmqp(new AmqpHeader());
                     break; // nothing to do..
                 case 3: // Client will be using SASL for auth..
                     sasl = protonTransport.sasl();
@@ -225,7 +221,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     }
 
     public void onFrame(Buffer frame) throws Exception {
-        // System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
         while (frame.length > 0) {
             try {
                 int count = protonTransport.input(frame.data, frame.offset, frame.length);
@@ -263,54 +258,30 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     }
                 }
 
-                // Handle the amqp open..
-                if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
-                    onConnectionOpen();
-                }
-
-                // Lets map amqp sessions to openwire sessions..
-                Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
-                while (session != null) {
-                    onSessionOpen(session);
-                    session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
-                }
-
-                Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
-                while (link != null) {
-                    onLinkOpen(link);
-                    link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
-                }
-
-                Delivery delivery = protonConnection.getWorkHead();
-                while (delivery != null) {
-                    AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
-                    if (listener != null) {
-                        listener.onDelivery(delivery);
+                Event event = null;
+                while ((event = eventCollector.peek()) != null) {
+                    switch (event.getType()) {
+                        case CONNECTION_REMOTE_STATE:
+                            processConnectionEvent(event.getConnection());
+                            break;
+                        case SESSION_REMOTE_STATE:
+                            processSessionEvent(event.getSession());
+                            break;
+                        case LINK_REMOTE_STATE:
+                            processLinkEvent(event.getLink());
+                            break;
+                        case LINK_FLOW:
+                            Link link = event.getLink();
+                            ((AmqpDeliveryListener) link.getContext()).drainCheck();
+                            break;
+                        case DELIVERY:
+                            processDelivery(event.getDelivery());
+                            break;
+                        default:
+                            break;
                     }
-                    delivery = delivery.getWorkNext();
-                }
 
-                link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
-                while (link != null) {
-                    ((AmqpDeliveryListener) link.getContext()).onClose();
-                    link.close();
-                    link = link.next(ACTIVE_STATE, CLOSED_STATE);
-                }
-
-                link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
-                while (link != null) {
-                    ((AmqpDeliveryListener) link.getContext()).drainCheck();
-                    link = link.next(ACTIVE_STATE, ALL_STATES);
-                }
-
-                session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
-                while (session != null) {
-                    // TODO - close links?
-                    onSessionClose(session);
-                    session = session.next(ACTIVE_STATE, CLOSED_STATE);
-                }
-                if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
-                    doClose();
+                    eventCollector.pop();
                 }
 
             } catch (Throwable e) {
@@ -321,6 +292,44 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
+    protected void processConnectionEvent(Connection connection) throws Exception {
+        EndpointState remoteState = connection.getRemoteState();
+        if (remoteState == EndpointState.ACTIVE) {
+            onConnectionOpen();
+        } else if (remoteState == EndpointState.CLOSED) {
+            doClose();
+        }
+    }
+
+    protected void processLinkEvent(Link link) throws Exception {
+        EndpointState remoteState = link.getRemoteState();
+        if (remoteState == EndpointState.ACTIVE) {
+            onLinkOpen(link);
+        } else if (remoteState == EndpointState.CLOSED) {
+            ((AmqpDeliveryListener) link.getContext()).onClose();
+            link.close();
+        }
+    }
+
+    protected void processSessionEvent(Session session) throws Exception {
+        EndpointState remoteState = session.getRemoteState();
+        if (remoteState == EndpointState.ACTIVE) {
+            onSessionOpen(session);
+        } else if (remoteState == EndpointState.CLOSED) {
+            // TODO - close links?
+            onSessionClose(session);
+        }
+    }
+
+    protected void processDelivery(Delivery delivery) throws Exception {
+        if (!delivery.isPartial()) {
+            AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+            if (listener != null) {
+                listener.onDelivery(delivery);
+            }
+        }
+    }
+
     boolean closing = false;
     boolean closedSocket = false;