You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/28 19:58:31 UTC

git commit: Make BrokerService visible to the protocol converter so that in the future we can use it to better manage durable subscriptions and link reattach behavior.

Repository: activemq
Updated Branches:
  refs/heads/trunk 135226533 -> adafdfe97


Make BrokerService visible to the protocol converter so that in the
future we can use it to better manage durable subscriptions and link
reattach behavior.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/adafdfe9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/adafdfe9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/adafdfe9

Branch: refs/heads/trunk
Commit: adafdfe97d9522d399ebe32f6c28fe31619b15a9
Parents: 1352265
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 28 14:58:17 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 28 14:58:17 2014 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         | 19 ++++++++++------
 .../transport/amqp/AMQPSslTransportFactory.java |  7 +++---
 .../transport/amqp/AmqpNioTransportFactory.java |  7 +++---
 .../transport/amqp/AmqpProtocolConverter.java   | 24 ++++++++++++++++++--
 .../activemq/transport/amqp/AmqpSupport.java    |  1 +
 .../transport/amqp/AmqpTransportFactory.java    |  7 +++---
 .../transport/amqp/AmqpTransportFilter.java     |  6 ++---
 7 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index 09478d7..a7607af 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 
 /**
@@ -29,14 +30,16 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
     private static final int DEFAULT_PREFETCH = 100;
 
-    final private AmqpTransport transport;
+    private final AmqpTransport transport;
+    private final BrokerService brokerService;
+
     private int prefetch = DEFAULT_PREFETCH;
     private int producerCredit = DEFAULT_PREFETCH;
 
     interface Discriminator {
         boolean matches(AmqpHeader header);
 
-        IAmqpProtocolConverter create(AmqpTransport transport);
+        IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
     }
 
     static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
@@ -44,8 +47,8 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
         DISCRIMINATORS.add(new Discriminator() {
 
             @Override
-            public IAmqpProtocolConverter create(AmqpTransport transport) {
-                return new AmqpProtocolConverter(transport);
+            public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
+                return new AmqpProtocolConverter(transport, brokerService);
             }
 
             @Override
@@ -60,13 +63,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
                 return false;
             }
         });
-
     }
 
     final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
 
-    public AMQPProtocolDiscriminator(AmqpTransport transport) {
+    public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
         this.transport = transport;
+        this.brokerService = brokerService;
     }
 
     @Override
@@ -80,11 +83,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
                     match = discriminator;
                 }
             }
+
             // Lets use first in the list if none are a good match.
             if (match == null) {
                 match = DISCRIMINATORS.get(0);
             }
-            IAmqpProtocolConverter next = match.create(transport);
+
+            IAmqpProtocolConverter next = match.create(transport, brokerService);
             next.setPrefetch(prefetch);
             next.setProducerCredit(producerCredit);
             transport.setProtocolConverter(next);

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
index 4c036a9..4d7af7e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -43,7 +42,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -63,7 +62,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
index c67d3b6..b017937 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -43,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -81,14 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 62966bc..7b3b825 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -19,10 +19,12 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -30,6 +32,7 @@ import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidSelectorException;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -55,8 +58,10 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.PersistenceAdapterSupport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -114,7 +119,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private static final int CHANNEL_MAX = 32767;
-    private final AmqpTransport amqpTransport;
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
@@ -122,6 +126,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
+    private final AmqpTransport amqpTransport;
+    private final BrokerService brokerService;
+
     protected int prefetch;
     protected int producerCredit;
     protected Transport protonTransport = Proton.transport();
@@ -129,8 +136,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     protected Collector eventCollector = new CollectorImpl();
     protected boolean useByteDestinationTypeAnnotation;
 
-    public AmqpProtocolConverter(AmqpTransport transport) {
+    public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
         this.amqpTransport = transport;
+        this.brokerService = brokerService;
 
         // the configured maxFrameSize on the URI.
         int maxFrameSize = transport.getWireFormat().getMaxAmqpFrameSize();
@@ -1468,4 +1476,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     public void setProducerCredit(int producerCredit) {
         this.producerCredit = producerCredit;
     }
+
+    @SuppressWarnings("unused")
+    private List<SubscriptionInfo> lookupSubscriptions() throws AmqpProtocolException {
+        List<SubscriptionInfo> subscriptions = Collections.emptyList();
+        try {
+            subscriptions = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
+        } catch (IOException e) {
+            throw new AmqpProtocolException("Error loading store subscriptions", true, e);
+        }
+
+        return subscriptions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index e3680c5..9a01f7b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
+ *
  */
 public class AmqpSupport {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
index e394c85..3ca8ea1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -43,14 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index ec63ae7..5fb7a04 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
@@ -49,9 +49,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
     private final ReentrantLock lock = new ReentrantLock();
 
-    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
         super(next);
-        this.protocolConverter = new AMQPProtocolDiscriminator(this);
+        this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
         if (wireFormat instanceof AmqpWireFormat) {
             this.wireFormat = (AmqpWireFormat) wireFormat;
         }