You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/02 21:36:25 UTC

svn commit: r1356431 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/transport/stomp/

Author: tabish
Date: Mon Jul  2 19:36:24 2012
New Revision: 1356431

URL: http://svn.apache.org/viewvc?rev=1356431&view=rev
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-3909

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jul  2 19:36:24 2012
@@ -808,4 +808,14 @@ public abstract class PrefetchSubscripti
     protected int getPrefetchExtension() {
         return this.prefetchExtension.get();
     }
+
+    @Override
+    public void setPrefetchSize(int prefetchSize) {
+        this.info.setPrefetchSize(prefetchSize);
+        try {
+            this.dispatchPending();
+        } catch (Exception e) {
+            LOG.trace("Caught exception during dispatch after prefetch change.", e);
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jul  2 19:36:24 2012
@@ -18,7 +18,6 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -148,35 +147,35 @@ public class Topic extends BaseDestinati
         } else {
             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             super.addSubscription(context, sub);
-    		sub.add(context, this);
-    		if(dsub.isActive()) {
-	        	synchronized (consumers) {
-	        		boolean hasSubscription = false;
-	
-	        		if(consumers.size()==0) {
-	            		hasSubscription = false;
-	        		} else {
-		        		for(Subscription currentSub : consumers) {
-		        			if(currentSub.getConsumerInfo().isDurable()) {
-		        	            DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
-		        	            if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
-		        	            	hasSubscription = true;
-		        	            	break;
-		        	            }
-		        			}
-		        		}
-	        		}
-	        		
-	                if(!hasSubscription)
-	                	consumers.add(sub);
-	            }
-    		}
+            sub.add(context, this);
+            if(dsub.isActive()) {
+                synchronized (consumers) {
+                    boolean hasSubscription = false;
+
+                    if (consumers.size() == 0) {
+                        hasSubscription = false;
+                    } else {
+                        for (Subscription currentSub : consumers) {
+                            if (currentSub.getConsumerInfo().isDurable()) {
+                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
+                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
+                                    hasSubscription = true;
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    if (!hasSubscription) {
+                        consumers.add(sub);
+                    }
+                }
+            }
             durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
-            throws Exception {
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
             super.removeSubscription(context, sub, lastDeliveredSequenceId);
             synchronized (consumers) {
@@ -332,9 +331,7 @@ public class Topic extends BaseDestinati
                             + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
-                // We can avoid blocking due to low usage if the producer is
-                // sending
-                // a sync message or
+                // We can avoid blocking due to low usage if the producer is sending a sync message or
                 // if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                     synchronized (messagesWaitingForSpace) {
@@ -378,10 +375,8 @@ public class Topic extends BaseDestinati
                     }
 
                 } else {
-                    // Producer flow control cannot be used, so we have do the
-                    // flow
-                    // control at the broker
-                    // by blocking this thread until there is space available.
+                    // Producer flow control cannot be used, so we have do the flow control
+                    // at the broker by blocking this thread until there is space available.
 
                     if (memoryUsage.isFull()) {
                         if (context.isInTransaction()) {
@@ -763,17 +758,6 @@ public class Topic extends BaseDestinati
         }
     }
 
-
-    private void clearPendingMessages(SubscriptionKey subscriptionKey) {
-        dispatchLock.readLock().lock();
-        try {
-            DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);
-            clearPendingAndDispatch(durableTopicSubscription);
-        } finally {
-            dispatchLock.readLock().unlock();
-        }
-    }
-
     private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
         synchronized (durableTopicSubscription.pendingLock) {
             durableTopicSubscription.pending.clear();
@@ -790,5 +774,4 @@ public class Topic extends BaseDestinati
     public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
         return durableSubcribers;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Jul  2 19:36:24 2012
@@ -19,7 +19,9 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -45,11 +47,11 @@ public class TopicSubscription extends A
 
     private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
     private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
-    
+
     protected PendingMessageCursor matched;
     protected final SystemUsage usageManager;
     protected AtomicLong dispatchedCounter = new AtomicLong();
-       
+
     boolean singleDestination = true;
     Destination destination;
 
@@ -99,9 +101,9 @@ public class TopicSubscription extends A
             dispatch(node);
             setSlowConsumer(false);
         } else {
-            if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
-                //we are slow
-                if(!isSlowConsumer()) {
+            if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
+                // Slow consumers should log and set their state as such.
+                if (!isSlowConsumer()) {
                     LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
                     setSlowConsumer(true);
                     for (Destination dest: destinations) {
@@ -131,15 +133,14 @@ public class TopicSubscription extends A
                             }
                             matchedListMutex.wait(20);
                         }
-                        //Temporary storage could be full - so just try to add the message
-                        //see https://issues.apache.org/activemq/browse/AMQ-2475
+                        // Temporary storage could be full - so just try to add the message
+                        // see https://issues.apache.org/activemq/browse/AMQ-2475
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
                     }
                 }
                 synchronized (matchedListMutex) {
-                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -154,28 +155,26 @@ public class TopicSubscription extends A
                         // lets discard old messages as we are a slow consumer
                         while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
                             int pageInSize = matched.size() - maximumPendingMessages;
-                            // only page in a 1000 at a time - else we could
-                            // blow da memory
+                            // only page in a 1000 at a time - else we could blow the memory
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
                             synchronized(matched){
                                 list = matched.pageInList(pageInSize);
-                            	oldMessages = messageEvictionStrategy.evictMessages(list);
-                            	for (MessageReference ref : list) {
-                            	    ref.decrementReferenceCount();
-                            	}
+                                oldMessages = messageEvictionStrategy.evictMessages(list);
+                                for (MessageReference ref : list) {
+                                    ref.decrementReferenceCount();
+                                }
                             }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
-	                            messagesToEvict = oldMessages.length;
-	                            for (int i = 0; i < messagesToEvict; i++) {
-	                                MessageReference oldMessage = oldMessages[i];
-	                                discard(oldMessage);
-	                            }
+                                messagesToEvict = oldMessages.length;
+                                for (int i = 0; i < messagesToEvict; i++) {
+                                    MessageReference oldMessage = oldMessages[i];
+                                    discard(oldMessage);
+                                }
                             }
-                            // lets avoid an infinite loop if we are given a bad
-                            // eviction strategy
+                            // lets avoid an infinite loop if we are given a bad eviction strategy
                             // for a bad strategy lets just not evict
                             if (messagesToEvict == 0) {
                                 LOG.warn("No messages to evict returned for "  + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
@@ -205,7 +204,7 @@ public class TopicSubscription extends A
     /**
      * Discard any expired messages from the matched list. Called from a
      * synchronized block.
-     * 
+     *
      * @throws IOException
      */
     protected void removeExpiredMessages() throws IOException {
@@ -275,12 +274,11 @@ public class TopicSubscription extends A
             dispatchMatched();
             return;
         } else if (ack.isDeliveredAck()) {
-            // Message was delivered but not acknowledged: update pre-fetch
-            // counters.
+            // Message was delivered but not acknowledged: update pre-fetch counters.
             // also. get these for a consumer expired message.
             if (destination != null && !ack.isInTransaction()) {
                 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
+                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
             }
             dequeueCounter.addAndGet(ack.getMessageCount());
             dispatchMatched();
@@ -375,36 +373,35 @@ public class TopicSubscription extends A
     public int getMaxAuditDepth() {
         return maxAuditDepth;
     }
-    
+
     public synchronized void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
         if (audit != null) {
             audit.setAuditDepth(maxAuditDepth);
         }
     }
-    
+
     public boolean isEnableAudit() {
         return enableAudit;
     }
 
     public synchronized void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
-        if (enableAudit && audit==null) {
+        if (enableAudit && audit == null) {
             audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
         }
     }
-    
+
     // Implementation methods
     // -------------------------------------------------------------------------
     public boolean isFull() {
-        return getDispatchedQueueSize()  >= info.getPrefetchSize();
+        return getDispatchedQueueSize() >= info.getPrefetchSize();
     }
-    
+
     public int getInFlightSize() {
         return getDispatchedQueueSize();
     }
-    
-    
+
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
@@ -456,7 +453,7 @@ public class TopicSubscription extends A
 
     /**
      * inform the MessageConsumer on the client to change it's prefetch
-     * 
+     *
      * @param newPrefetch
      */
     public void updateConsumerPrefetch(int newPrefetch) {
@@ -468,18 +465,17 @@ public class TopicSubscription extends A
         }
     }
 
-    private void dispatchMatched() throws IOException {       
+    private void dispatchMatched() throws IOException {
         synchronized (matchedListMutex) {
             if (!matched.isEmpty() && !isFull()) {
                 try {
                     matched.reset();
-                   
+
                     while (matched.hasNext() && !isFull()) {
                         MessageReference message = matched.next();
                         message.decrementReferenceCount();
                         matched.remove();
-                        // Message may have been sitting in the matched list a
-                        // while
+                        // Message may have been sitting in the matched list a while
                         // waiting for the consumer to ak the message.
                         if (message.isExpired()) {
                             discard(message);
@@ -503,8 +499,7 @@ public class TopicSubscription extends A
         md.setConsumerId(info.getConsumerId());
         md.setDestination(node.getRegionDestination().getActiveMQDestination());
         dispatchedCounter.incrementAndGet();
-        // Keep track if this subscription is receiving messages from a single
-        // destination.
+        // Keep track if this subscription is receiving messages from a single destination.
         if (singleDestination) {
             if (destination == null) {
                 destination = node.getRegionDestination();
@@ -572,4 +567,13 @@ public class TopicSubscription extends A
         return info.getPrefetchSize();
     }
 
+    @Override
+    public void setPrefetchSize(int newSize) {
+        info.setPrefetchSize(newSize);
+        try {
+            dispatchMatched();
+        } catch(Exception e) {
+            LOG.trace("Caught exception on dispatch after prefetch size change.");
+        }
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java?rev=1356431&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java Mon Jul  2 19:36:24 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StompMissingMessageTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompMissingMessageTest.class);
+
+    protected String bindAddress = "stomp://localhost:61613";
+    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
+    protected String jmsUri = "vm://localhost";
+
+    private BrokerService broker;
+    protected String destination;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = BrokerFactory.createBroker(new URI(confUri));
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        broker.waitUntilStarted();
+
+        destination = "/topic/" + getTopicName();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testProducerConsumerLoop() throws Exception {
+        final int ITERATIONS = 500;
+        int received = 0;
+
+        for (int i = 1; i <= ITERATIONS*2; i+=2) {
+            if (doTestProducerConsumer(i) != null) {
+                received++;
+            }
+        }
+
+        assertEquals(ITERATIONS, received);
+    }
+
+    public String doTestProducerConsumer(int index) throws Exception {
+        String message = null;
+
+        assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
+
+        StompConnection producer = stompConnect();
+        StompConnection consumer = stompConnect();
+
+        subscribe(consumer, Integer.toString(index));
+
+        sendMessage(producer, index);
+
+        try {
+            StompFrame frame = consumer.receive();
+            LOG.debug("Consumer got frame: " + message);
+            assertEquals(index, (int) Integer.valueOf(frame.getBody()));
+            message = frame.getBody();
+        } catch(Exception e) {
+            fail("Consumer["+index+"] got error while consuming: " + e.getMessage());
+        }
+
+        unsubscribe(consumer, Integer.toString(index));
+
+        stompDisconnect(consumer);
+        stompDisconnect(producer);
+
+        return message;
+    }
+
+    @Test
+    public void testProducerDurableConsumerLoop() throws Exception {
+        final int ITERATIONS = 500;
+        int received = 0;
+
+        for (int i = 1; i <= ITERATIONS*2; i+=2) {
+            if (doTestProducerDurableConsumer(i) != null) {
+                received++;
+            }
+        }
+
+        assertEquals(ITERATIONS, received);
+    }
+
+    public String doTestProducerDurableConsumer(int index) throws Exception {
+        String message = null;
+
+        assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
+
+        StompConnection producer = stompConnect();
+        StompConnection consumer = stompConnect("test");
+
+        subscribe(consumer, Integer.toString(index), true);
+
+        sendMessage(producer, index);
+
+        try {
+            StompFrame frame = consumer.receive();
+            LOG.debug("Consumer got frame: " + message);
+            assertEquals(index, (int) Integer.valueOf(frame.getBody()));
+            message = frame.getBody();
+        } catch(Exception e) {
+            fail("Consumer["+index+"] got error while consuming: " + e.getMessage());
+        }
+
+        unsubscribe(consumer, Integer.toString(index));
+
+        stompDisconnect(consumer);
+        stompDisconnect(producer);
+
+        return message;
+    }
+
+    protected void subscribe(StompConnection stompConnection, String subscriptionId) throws Exception {
+        subscribe(stompConnection, subscriptionId, false);
+    }
+
+    protected void subscribe(StompConnection stompConnection, String subscriptionId, boolean durable) throws Exception {
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("id", subscriptionId);
+        if (durable) {
+            headers.put("activemq.subscriptionName", subscriptionId);
+        }
+        headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+        stompConnection.subscribe(destination, "auto", headers);
+
+        StompFrame received = stompConnection.receive();
+        assertEquals("RECEIPT", received.getAction());
+        String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+        assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+    }
+
+    protected void unsubscribe(StompConnection stompConnection, String subscriptionId) throws Exception {
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("id", subscriptionId);
+        headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+        stompConnection.unsubscribe(destination, headers);
+
+        StompFrame received = stompConnection.receive();
+        assertEquals("RECEIPT", received.getAction());
+        String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+        assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+    }
+
+    protected void sendMessage(StompConnection producer, int index) throws Exception {
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+        producer.send(destination, Integer.toString(index), null, headers);
+
+        StompFrame received = producer.receive();
+        assertEquals("RECEIPT", received.getAction());
+        String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+        assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+    }
+
+    protected StompConnection stompConnect() throws Exception {
+        return stompConnect(null);
+    }
+
+    protected StompConnection stompConnect(String clientId) throws Exception {
+        StompConnection stompConnection = new StompConnection();
+        URI connectUri = new URI(bindAddress);
+        stompConnection.open(createSocket(connectUri));
+        stompConnection.connect("system", "manager", clientId);
+        return stompConnection;
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        return new Socket("127.0.0.1", connectUri.getPort());
+    }
+
+    protected String getTopicName() {
+        return getClass().getName() + ".Messages";
+    }
+
+    protected void stompDisconnect(StompConnection connection) throws Exception {
+        if (connection != null) {
+            String receiptId = UUID.randomUUID().toString();
+            connection.disconnect(receiptId);
+            if (!connection.receive().getAction().equals(Stomp.Responses.RECEIPT)) {
+                throw new Exception("Failed to receive receipt for disconnect.");
+            }
+            connection.close();
+            connection = null;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native