You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/05 18:41:07 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5560

Repository: activemq
Updated Branches:
  refs/heads/master ab28b771e -> ace101a03


https://issues.apache.org/jira/browse/AMQ-5560

Use the type descriptor and not the key, also check object type since
Map from Proton is not enforcing.  Remove some dead code and uneeded
mutex locks for id generation.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ace101a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ace101a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ace101a0

Branch: refs/heads/master
Commit: ace101a03aea845e3e118abb0ba565aa97563191
Parents: ab28b77
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Mar 5 12:40:47 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Mar 5 12:40:47 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 84 ++++++++++----------
 1 file changed, 43 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ace101a0/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index fa0e024..816008f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -31,6 +31,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
@@ -87,6 +90,7 @@ import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -136,8 +140,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
     private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
     private static final Symbol COPY = Symbol.getSymbol("copy");
-    private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
-    private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+    private static final UnsignedLong JMS_SELECTOR = UnsignedLong.valueOf(0x0000468C00000004L);
+    private static final UnsignedLong NO_LOCAL = UnsignedLong.valueOf(0x0000468C00000003L);
     private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
     private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
 
@@ -844,12 +848,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
-    long nextTransactionId = 1;
-
-    class Transaction {
-    }
-
-    HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+    private final AtomicLong nextTransactionId = new AtomicLong();
 
     public byte[] toBytes(long value) {
         Buffer buffer = new Buffer(8);
@@ -885,7 +884,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     throw new Exception("don't know how to handle a declare /w a set GlobalId");
                 }
 
-                long txid = nextTransactionId++;
+                long txid = nextTransactionId.incrementAndGet();
                 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
                 sendToActiveMQ(txinfo, null);
                 LOG.trace("started transaction {}", txid);
@@ -1404,7 +1403,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings("unchecked")
     void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
         org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
 
@@ -1415,21 +1414,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
             String selector = null;
             if (source != null) {
-                Map filter = source.getFilter();
+                DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR);
                 if (filter != null) {
-                    DescribedType value = (DescribedType) filter.get(JMS_SELECTOR);
-                    if (value != null) {
-                        selector = value.getDescribed().toString();
-                        // Validate the Selector.
-                        try {
-                            SelectorParser.parse(selector);
-                        } catch (InvalidSelectorException e) {
-                            sender.setSource(null);
-                            sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
-                            sender.close();
-                            consumerContext.closed = true;
-                            return;
-                        }
+                    selector = filter.getDescribed().toString();
+                    // Validate the Selector.
+                    try {
+                        SelectorParser.parse(selector);
+                    } catch (InvalidSelectorException e) {
+                        sender.setSource(null);
+                        sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+                        sender.close();
+                        consumerContext.closed = true;
+                        return;
                     }
                 }
             }
@@ -1507,12 +1503,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 consumerInfo.setSubscriptionName(sender.getName());
             }
 
-            Map filter = source.getFilter();
+            DescribedType filter = findFilter(source.getFilter(), NO_LOCAL);
             if (filter != null) {
-                DescribedType value = (DescribedType) filter.get(NO_LOCAL);
-                if (value != null) {
-                    consumerInfo.setNoLocal(true);
-                }
+                consumerInfo.setNoLocal(true);
             }
 
             sendToActiveMQ(consumerInfo, new ResponseHandler() {
@@ -1623,7 +1616,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     }
 
     private boolean contains(Symbol[] symbols, Symbol key) {
-        if (symbols == null) {
+        if (symbols == null || symbols.length == 0) {
             return false;
         }
 
@@ -1636,25 +1629,34 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         return false;
     }
 
+    private DescribedType findFilter(Map<Symbol, Object> filters, UnsignedLong filterId) {
+        if (filters == null || filters.isEmpty()) {
+            return null;
+        }
+
+        for (Object value : filters.values()) {
+            if (value instanceof DescribedType) {
+                DescribedType describedType = (DescribedType) value;
+                if (describedType.getDescriptor().equals(filterId)) {
+                    return describedType;
+                }
+            }
+        }
+
+        return null;
+    }
+
     // //////////////////////////////////////////////////////////////////////////
     //
     // Implementation methods
     //
     // //////////////////////////////////////////////////////////////////////////
 
-    private final Object commnadIdMutex = new Object();
-    private int lastCommandId;
-
-    int generateCommandId() {
-        synchronized (commnadIdMutex) {
-            return lastCommandId++;
-        }
-    }
-
-    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+    private final AtomicInteger lastCommandId = new AtomicInteger();
+    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
 
     void sendToActiveMQ(Command command, ResponseHandler handler) {
-        command.setCommandId(generateCommandId());
+        command.setCommandId(lastCommandId.incrementAndGet());
         if (handler != null) {
             command.setResponseRequired(true);
             resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);