You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/24 23:09:45 UTC

[4/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591

https://issues.apache.org/jira/browse/AMQ-5591

Refactoring of the AMQP protocol stack to allow for more flexibility in
adding support for some additional AMQP semantics and group together
common functionality handling to avoid having to fix simillar issues in
multiple places.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3306467a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3306467a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3306467a

Branch: refs/heads/master
Commit: 3306467a6407e164fb7b8304eb6c0dc5cb67a696
Parents: e33b3f5
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 24 18:09:28 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 24 18:09:28 2015 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  121 --
 .../transport/amqp/AMQPSslTransportFactory.java |   75 -
 .../activemq/transport/amqp/AmqpHeader.java     |    2 +
 .../transport/amqp/AmqpInactivityMonitor.java   |   10 +-
 .../transport/amqp/AmqpProtocolConverter.java   | 1699 +-----------------
 .../amqp/AmqpProtocolDiscriminator.java         |  114 ++
 .../transport/amqp/AmqpSslTransportFactory.java |   75 +
 .../activemq/transport/amqp/AmqpSupport.java    |   39 +-
 .../activemq/transport/amqp/AmqpTransport.java  |    5 +-
 .../transport/amqp/AmqpTransportFilter.java     |   22 +-
 .../activemq/transport/amqp/AmqpWireFormat.java |   20 +
 .../transport/amqp/AmqpWireFormatFactory.java   |    2 +-
 .../transport/amqp/IAmqpProtocolConverter.java  |   36 -
 .../transport/amqp/ResponseHandler.java         |   19 +-
 .../amqp/protocol/AmqpAbstractLink.java         |  167 ++
 .../amqp/protocol/AmqpAbstractReceiver.java     |  106 ++
 .../transport/amqp/protocol/AmqpConnection.java |  742 ++++++++
 .../transport/amqp/protocol/AmqpLink.java       |   96 +
 .../transport/amqp/protocol/AmqpReceiver.java   |  254 +++
 .../transport/amqp/protocol/AmqpResource.java   |   34 +
 .../transport/amqp/protocol/AmqpSender.java     |  451 +++++
 .../transport/amqp/protocol/AmqpSession.java    |  365 ++++
 .../protocol/AmqpTransactionCoordinator.java    |  162 ++
 .../amqp/protocol/AmqpTransferTagGenerator.java |  103 ++
 .../org/apache/activemq/transport/amqp+ssl      |    6 +-
 .../transport/amqp/AmqpTestSupport.java         |    3 +-
 .../amqp/interop/AmqpTempDestinationTest.java   |    6 +-
 27 files changed, 2803 insertions(+), 1931 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
deleted file mode 100644
index f5b457b..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.Command;
-
-/**
- * Used to assign the best implementation of a AmqpProtocolConverter to the
- * AmqpTransport based on the AmqpHeader that the client sends us.
- */
-public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
-
-    public static final int DEFAULT_PREFETCH = 1000;
-
-    private final AmqpTransport transport;
-    private final BrokerService brokerService;
-
-    private int producerCredit = DEFAULT_PREFETCH;
-
-    interface Discriminator {
-        boolean matches(AmqpHeader header);
-
-        IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
-    }
-
-    static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
-    static {
-        DISCRIMINATORS.add(new Discriminator() {
-
-            @Override
-            public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
-                return new AmqpProtocolConverter(transport, brokerService);
-            }
-
-            @Override
-            public boolean matches(AmqpHeader header) {
-                switch (header.getProtocolId()) {
-                    case 0:
-                    case 3:
-                        if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
-                            return true;
-                        }
-                }
-                return false;
-            }
-        });
-    }
-
-    final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
-
-    public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
-        this.transport = transport;
-        this.brokerService = brokerService;
-    }
-
-    @Override
-    public void onAMQPData(Object command) throws Exception {
-        if (command.getClass() == AmqpHeader.class) {
-            AmqpHeader header = (AmqpHeader) command;
-
-            Discriminator match = null;
-            for (Discriminator discriminator : DISCRIMINATORS) {
-                if (discriminator.matches(header)) {
-                    match = discriminator;
-                }
-            }
-
-            // Lets use first in the list if none are a good match.
-            if (match == null) {
-                match = DISCRIMINATORS.get(0);
-            }
-
-            IAmqpProtocolConverter next = match.create(transport, brokerService);
-            next.setProducerCredit(producerCredit);
-            transport.setProtocolConverter(next);
-            for (Command send : pendingCommands) {
-                next.onActiveMQCommand(send);
-            }
-            pendingCommands.clear();
-            next.onAMQPData(command);
-        } else {
-            throw new IllegalStateException();
-        }
-    }
-
-    @Override
-    public void onAMQPException(IOException error) {
-    }
-
-    @Override
-    public void onActiveMQCommand(Command command) throws Exception {
-        pendingCommands.add(command);
-    }
-
-    @Override
-    public void updateTracer() {
-    }
-
-    @Override
-    public void setProducerCredit(int producerCredit) {
-        this.producerCredit = producerCredit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
deleted file mode 100644
index 12bd526..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-
-/**
- * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
- */
-public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
-
-    private BrokerService brokerService = null;
-
-    @Override
-    protected String getDefaultWireFormatType() {
-        return "amqp";
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new AmqpTransportFilter(transport, format, brokerService);
-        IntrospectionSupport.setProperties(transport, options);
-        return super.compositeConfigure(transport, format, options);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
-        transport = super.serverConfigure(transport, format, options);
-
-        // strip off the mutex transport.
-        if (transport instanceof MutexTransport) {
-            transport = ((MutexTransport) transport).getNext();
-        }
-
-        return transport;
-    }
-
-    @Override
-    public void setBrokerService(BrokerService brokerService) {
-        this.brokerService = brokerService;
-    }
-
-    @Override
-    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
-        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
-        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-        filter.setInactivityMonitor(monitor);
-        return monitor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
index 2597b2d..d019277 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
@@ -19,6 +19,8 @@ package org.apache.activemq.transport.amqp;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
+ * Represents the AMQP protocol handshake packet that is sent during the
+ * initial exchange with a remote peer.
  */
 public class AmqpHeader {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
index 065559d..8cf6488 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
@@ -44,7 +44,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
     private static Timer ACTIVITY_CHECK_TIMER;
 
     private final AtomicBoolean failed = new AtomicBoolean(false);
-    private IAmqpProtocolConverter protocolConverter;
+    private AmqpProtocolConverter protocolConverter;
 
     private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
     private SchedulerTimerTask connectCheckerTask;
@@ -98,15 +98,15 @@ public class AmqpInactivityMonitor extends TransportFilter {
         }
     }
 
-    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
 
-    public IAmqpProtocolConverter getProtocolConverter() {
+    public AmqpProtocolConverter getProtocolConverter() {
         return protocolConverter;
     }
 
-    synchronized void startConnectChecker(long connectionTimeout) {
+    public synchronized void startConnectChecker(long connectionTimeout) {
         this.connectionTimeout = connectionTimeout;
         if (connectionTimeout > 0 && connectCheckerTask == null) {
             connectCheckerTask = new SchedulerTimerTask(connectChecker);
@@ -124,7 +124,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
         }
     }
 
-    synchronized void stopConnectChecker() {
+    public synchronized void stopConnectChecker() {
         if (connectCheckerTask != null) {
             connectCheckerTask.cancel();
             connectCheckerTask = null;