You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/07 18:13:19 UTC

svn commit: r1479963 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java

Author: tabish
Date: Tue May  7 16:13:18 2013
New Revision: 1479963

URL: http://svn.apache.org/r1479963
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4513

Makes the locking in RegionBroker a bit more fine grained.  We hold a lock only for a short time and allow destination adds that aren't on the same destination to continue on concurrently.  

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1479963&r1=1479962&r2=1479963&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue May  7 16:13:18 2013
@@ -43,7 +43,23 @@ import org.apache.activemq.broker.Produc
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
@@ -59,8 +75,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Routes Broker operations to the correct messaging regions for processing.
- *
- *
  */
 public class RegionBroker extends EmptyBroker {
     public static final String ORIGINAL_EXPIRATION = "originalExpiration";
@@ -80,6 +94,7 @@ public class RegionBroker extends EmptyB
     private boolean keepDurableSubsActive;
 
     private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
+    private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>();
     private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
 
@@ -102,9 +117,9 @@ public class RegionBroker extends EmptyB
     };
 
     public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
-                        DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
+        DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException {
         this.brokerService = brokerService;
-        this.executor=executor;
+        this.executor = executor;
         this.scheduler = scheduler;
         if (destinationFactory == null) {
             throw new IllegalArgumentException("null destinationFactory");
@@ -126,7 +141,7 @@ public class RegionBroker extends EmptyB
     }
 
     @Override
-    public Set <Destination> getDestinations(ActiveMQDestination destination) {
+    public Set<Destination> getDestinations(ActiveMQDestination destination) {
         try {
             return getRegion(destination).getDestinations(destination);
         } catch (JMSException jmse) {
@@ -216,7 +231,7 @@ public class RegionBroker extends EmptyB
             ConnectionContext oldContext = clientIdSet.get(clientId);
             if (oldContext != null) {
                 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
-                                                   + oldContext.getConnection().getRemoteAddress());
+                    + oldContext.getConnection().getRemoteAddress());
             } else {
                 clientIdSet.put(clientId, context);
             }
@@ -267,21 +282,42 @@ public class RegionBroker extends EmptyB
             return answer;
         }
 
-     synchronized (destinations) {
-        answer = destinations.get(destination);
-        if (answer != null) {
-            return answer;
+        synchronized (destinationGate) {
+            answer = destinations.get(destination);
+            if (answer != null) {
+                return answer;
+            }
+
+            if (destinationGate.get(destination) != null) {
+                // Guard against spurious wakeup.
+                while (destinationGate.containsKey(destination)) {
+                    destinationGate.wait();
+                }
+                answer = destinations.get(destination);
+                if (answer != null) {
+                    return answer;
+                } else {
+                    // In case of intermediate remove or add failure
+                    destinationGate.put(destination, destination);
+                }
+            }
         }
 
-        boolean create = true;
-        if (destination.isTemporary())
-            create = createIfTemp;
-        answer = getRegion(destination).addDestination(context, destination, create);
+        try {
+            boolean create = true;
+            if (destination.isTemporary()) {
+                create = createIfTemp;
+            }
+            answer = getRegion(destination).addDestination(context, destination, create);
+            destinations.put(destination, answer);
+        } finally {
+            synchronized (destinationGate) {
+                destinationGate.remove(destination);
+                destinationGate.notifyAll();
+            }
+        }
 
-        destinations.put(destination, answer);
         return answer;
-     }
-
     }
 
     @Override
@@ -294,14 +330,13 @@ public class RegionBroker extends EmptyB
 
     @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
-        addDestination(context, info.getDestination(),true);
+        addDestination(context, info.getDestination(), true);
 
     }
 
     @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         removeDestination(context, info.getDestination(), info.getTimeout());
-
     }
 
     @Override
@@ -384,9 +419,10 @@ public class RegionBroker extends EmptyB
         ActiveMQDestination destination = message.getDestination();
         message.setBrokerInTime(System.currentTimeMillis());
         if (producerExchange.isMutable() || producerExchange.getRegion() == null
-                || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
+            || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
             // ensure the destination is registered with the RegionBroker
-            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
+            producerExchange.getConnectionContext().getBroker()
+                .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
             producerExchange.setRegion(getRegion(destination));
             producerExchange.setRegionDestination(null);
         }
@@ -412,16 +448,16 @@ public class RegionBroker extends EmptyB
 
     protected Region getRegion(ActiveMQDestination destination) throws JMSException {
         switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            return queueRegion;
-        case ActiveMQDestination.TOPIC_TYPE:
-            return topicRegion;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            return tempQueueRegion;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            return tempTopicRegion;
-        default:
-            throw createUnknownDestinationTypeException(destination);
+            case ActiveMQDestination.QUEUE_TYPE:
+                return queueRegion;
+            case ActiveMQDestination.TOPIC_TYPE:
+                return topicRegion;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                return tempQueueRegion;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                return tempTopicRegion;
+            default:
+                throw createUnknownDestinationTypeException(destination);
         }
     }
 
@@ -523,7 +559,7 @@ public class RegionBroker extends EmptyB
         if (info != null) {
             BrokerInfo existing = brokerInfos.get(info.getBrokerId());
             if (existing != null && existing.decrementRefCount() == 0) {
-               brokerInfos.remove(info.getBrokerId());
+                brokerInfos.remove(info.getBrokerId());
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
@@ -551,7 +587,7 @@ public class RegionBroker extends EmptyB
             message.setBrokerOutTime(endTime);
             if (getBrokerService().isEnableStatistics()) {
                 long totalTime = endTime - message.getBrokerInTime();
-                ((Destination)message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
+                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
             }
         }
     }
@@ -589,7 +625,7 @@ public class RegionBroker extends EmptyB
 
     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
         this.keepDurableSubsActive = keepDurableSubsActive;
-        ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
+        ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
     }
 
     public DestinationInterceptor getDestinationInterceptor() {
@@ -647,16 +683,15 @@ public class RegionBroker extends EmptyB
     }
 
     private boolean stampAsExpired(Message message) throws IOException {
-        boolean stamped=false;
+        boolean stamped = false;
         if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
-            long expiration=message.getExpiration();
-            message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
+            long expiration = message.getExpiration();
+            message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
             stamped = true;
         }
         return stamped;
     }
 
-
     @Override
     public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
         if (LOG.isDebugEnabled()) {
@@ -666,47 +701,42 @@ public class RegionBroker extends EmptyB
     }
 
     @Override
-    public void sendToDeadLetterQueue(ConnectionContext context,
-            MessageReference node, Subscription subscription){
-        try{
-            if(node!=null){
-                Message message=node.getMessage();
-                if(message!=null && node.getRegionDestination()!=null){
-                    DeadLetterStrategy deadLetterStrategy=((Destination)node
-                            .getRegionDestination()).getDeadLetterStrategy();
-                    if(deadLetterStrategy!=null){
-                        if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
+        try {
+            if (node != null) {
+                Message message = node.getMessage();
+                if (message != null && node.getRegionDestination() != null) {
+                    DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy();
+                    if (deadLetterStrategy != null) {
+                        if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
                             // message may be inflight to other subscriptions so do not modify
                             message = message.copy();
                             stampAsExpired(message);
                             message.setExpiration(0);
-                            if(!message.isPersistent()){
+                            if (!message.isPersistent()) {
                                 message.setPersistent(true);
-                                message.setProperty("originalDeliveryMode",
-                                        "NON_PERSISTENT");
+                                message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
                             }
                             // The original destination and transaction id do
                             // not get filled when the message is first sent,
                             // it is only populated if the message is routed to
                             // another destination like the DLQ
-                            ActiveMQDestination deadLetterDestination=deadLetterStrategy
-                                    .getDeadLetterQueueFor(message, subscription);
-                            if (context.getBroker()==null) {
+                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
+                            if (context.getBroker() == null) {
                                 context.setBroker(getRoot());
                             }
-                            BrokerSupport.resendNoCopy(context,message,
-                                    deadLetterDestination);
+                            BrokerSupport.resendNoCopy(context, message, deadLetterDestination);
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
-                                    + message.getMessageId() + ", destination: " + message.getDestination());
+                            LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " + message.getMessageId() + ", destination: "
+                                + message.getDestination());
                         }
                     }
                 }
             }
-        }catch(Exception e){
-            LOG.warn("Caught an exception sending to DLQ: "+node,e);
+        } catch (Exception e) {
+            LOG.warn("Caught an exception sending to DLQ: " + node, e);
         }
     }
 
@@ -725,12 +755,11 @@ public class RegionBroker extends EmptyB
      */
     @Override
     public long getBrokerSequenceId() {
-        synchronized(sequenceGenerator) {
+        synchronized (sequenceGenerator) {
             return sequenceGenerator.getNextSequenceId();
         }
     }
 
-
     @Override
     public Scheduler getScheduler() {
         return this.scheduler;
@@ -747,7 +776,7 @@ public class RegionBroker extends EmptyB
         try {
             getRegion(destination).processConsumerControl(consumerExchange, control);
         } catch (JMSException jmse) {
-            LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
+            LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
         }
     }
 
@@ -801,8 +830,7 @@ public class RegionBroker extends EmptyB
                     if (dest instanceof BaseDestination) {
                         log = ((BaseDestination) dest).getLog();
                     }
-                    log.info(dest.getName() + " Inactive for longer than " +
-                             dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
+                    log.info(dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
                     try {
                         getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
                     } catch (Exception e) {

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java?rev=1479963&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java Tue May  7 16:13:18 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4513Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        ExecutorService service = Executors.newFixedThreadPool(25);
+
+        final Random ripple = new Random(System.currentTimeMillis());
+
+        for (int i = 0; i < 1000; ++i) {
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        session.createProducer(destination);
+                        connection.close();
+                        TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
+                    } catch (Exception e) {
+                    }
+                }
+            });
+
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        MessageProducer producer = session.createProducer(destination);
+                        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                        producer.setTimeToLive(400);
+                        producer.send(session.createTextMessage());
+                        producer.send(session.createTextMessage());
+                        TimeUnit.MILLISECONDS.sleep(500);
+                        connection.close();
+                    } catch (Exception e) {
+                    }
+                }
+            });
+
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        session.createProducer(destination);
+                        connection.close();
+                        TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
+                    } catch (Exception e) {
+                    }
+                }
+            });
+        }
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(5, TimeUnit.MINUTES));
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
------------------------------------------------------------------------------
    svn:eol-style = native