You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/02 21:36:25 UTC
svn commit: r1356431 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/transport/stomp/
Author: tabish
Date: Mon Jul 2 19:36:24 2012
New Revision: 1356431
URL: http://svn.apache.org/viewvc?rev=1356431&view=rev
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-3909
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jul 2 19:36:24 2012
@@ -808,4 +808,14 @@ public abstract class PrefetchSubscripti
protected int getPrefetchExtension() {
return this.prefetchExtension.get();
}
+
+ @Override
+ public void setPrefetchSize(int prefetchSize) {
+ this.info.setPrefetchSize(prefetchSize);
+ try {
+ this.dispatchPending();
+ } catch (Exception e) {
+ LOG.trace("Caught exception during dispatch after prefetch change.", e);
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jul 2 19:36:24 2012
@@ -18,7 +18,6 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -148,35 +147,35 @@ public class Topic extends BaseDestinati
} else {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
super.addSubscription(context, sub);
- sub.add(context, this);
- if(dsub.isActive()) {
- synchronized (consumers) {
- boolean hasSubscription = false;
-
- if(consumers.size()==0) {
- hasSubscription = false;
- } else {
- for(Subscription currentSub : consumers) {
- if(currentSub.getConsumerInfo().isDurable()) {
- DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
- if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
- hasSubscription = true;
- break;
- }
- }
- }
- }
-
- if(!hasSubscription)
- consumers.add(sub);
- }
- }
+ sub.add(context, this);
+ if(dsub.isActive()) {
+ synchronized (consumers) {
+ boolean hasSubscription = false;
+
+ if (consumers.size() == 0) {
+ hasSubscription = false;
+ } else {
+ for (Subscription currentSub : consumers) {
+ if (currentSub.getConsumerInfo().isDurable()) {
+ DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
+ if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
+ hasSubscription = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!hasSubscription) {
+ consumers.add(sub);
+ }
+ }
+ }
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
}
- public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
- throws Exception {
+ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
synchronized (consumers) {
@@ -332,9 +331,7 @@ public class Topic extends BaseDestinati
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
- // We can avoid blocking due to low usage if the producer is
- // sending
- // a sync message or
+ // We can avoid blocking due to low usage if the producer is sending a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
@@ -378,10 +375,8 @@ public class Topic extends BaseDestinati
}
} else {
- // Producer flow control cannot be used, so we have do the
- // flow
- // control at the broker
- // by blocking this thread until there is space available.
+ // Producer flow control cannot be used, so we have do the flow control
+ // at the broker by blocking this thread until there is space available.
if (memoryUsage.isFull()) {
if (context.isInTransaction()) {
@@ -763,17 +758,6 @@ public class Topic extends BaseDestinati
}
}
-
- private void clearPendingMessages(SubscriptionKey subscriptionKey) {
- dispatchLock.readLock().lock();
- try {
- DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);
- clearPendingAndDispatch(durableTopicSubscription);
- } finally {
- dispatchLock.readLock().unlock();
- }
- }
-
private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
synchronized (durableTopicSubscription.pendingLock) {
durableTopicSubscription.pending.clear();
@@ -790,5 +774,4 @@ public class Topic extends BaseDestinati
public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
return durableSubcribers;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Jul 2 19:36:24 2012
@@ -19,7 +19,9 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
+
import javax.jms.JMSException;
+
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -45,11 +47,11 @@ public class TopicSubscription extends A
private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
-
+
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
-
+
boolean singleDestination = true;
Destination destination;
@@ -99,9 +101,9 @@ public class TopicSubscription extends A
dispatch(node);
setSlowConsumer(false);
} else {
- if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
- //we are slow
- if(!isSlowConsumer()) {
+ if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
+ // Slow consumers should log and set their state as such.
+ if (!isSlowConsumer()) {
LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
setSlowConsumer(true);
for (Destination dest: destinations) {
@@ -131,15 +133,14 @@ public class TopicSubscription extends A
}
matchedListMutex.wait(20);
}
- //Temporary storage could be full - so just try to add the message
- //see https://issues.apache.org/activemq/browse/AMQ-2475
+ // Temporary storage could be full - so just try to add the message
+ // see https://issues.apache.org/activemq/browse/AMQ-2475
if (matched.tryAddMessageLast(node, 10)) {
break;
}
}
}
synchronized (matchedListMutex) {
-
// NOTE - be careful about the slaveBroker!
if (maximumPendingMessages > 0) {
// calculate the high water mark from which point we
@@ -154,28 +155,26 @@ public class TopicSubscription extends A
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
int pageInSize = matched.size() - maximumPendingMessages;
- // only page in a 1000 at a time - else we could
- // blow da memory
+ // only page in a 1000 at a time - else we could blow the memory
pageInSize = Math.max(1000, pageInSize);
LinkedList<MessageReference> list = null;
MessageReference[] oldMessages=null;
synchronized(matched){
list = matched.pageInList(pageInSize);
- oldMessages = messageEvictionStrategy.evictMessages(list);
- for (MessageReference ref : list) {
- ref.decrementReferenceCount();
- }
+ oldMessages = messageEvictionStrategy.evictMessages(list);
+ for (MessageReference ref : list) {
+ ref.decrementReferenceCount();
+ }
}
int messagesToEvict = 0;
if (oldMessages != null){
- messagesToEvict = oldMessages.length;
- for (int i = 0; i < messagesToEvict; i++) {
- MessageReference oldMessage = oldMessages[i];
- discard(oldMessage);
- }
+ messagesToEvict = oldMessages.length;
+ for (int i = 0; i < messagesToEvict; i++) {
+ MessageReference oldMessage = oldMessages[i];
+ discard(oldMessage);
+ }
}
- // lets avoid an infinite loop if we are given a bad
- // eviction strategy
+ // lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
if (messagesToEvict == 0) {
LOG.warn("No messages to evict returned for " + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
@@ -205,7 +204,7 @@ public class TopicSubscription extends A
/**
* Discard any expired messages from the matched list. Called from a
* synchronized block.
- *
+ *
* @throws IOException
*/
protected void removeExpiredMessages() throws IOException {
@@ -275,12 +274,11 @@ public class TopicSubscription extends A
dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
- // Message was delivered but not acknowledged: update pre-fetch
- // counters.
+ // Message was delivered but not acknowledged: update pre-fetch counters.
// also. get these for a consumer expired message.
if (destination != null && !ack.isInTransaction()) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+ destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
dispatchMatched();
@@ -375,36 +373,35 @@ public class TopicSubscription extends A
public int getMaxAuditDepth() {
return maxAuditDepth;
}
-
+
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
if (audit != null) {
audit.setAuditDepth(maxAuditDepth);
}
}
-
+
public boolean isEnableAudit() {
return enableAudit;
}
public synchronized void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
- if (enableAudit && audit==null) {
+ if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
}
}
-
+
// Implementation methods
// -------------------------------------------------------------------------
public boolean isFull() {
- return getDispatchedQueueSize() >= info.getPrefetchSize();
+ return getDispatchedQueueSize() >= info.getPrefetchSize();
}
-
+
public int getInFlightSize() {
return getDispatchedQueueSize();
}
-
-
+
/**
* @return true when 60% or more room is left for dispatching messages
*/
@@ -456,7 +453,7 @@ public class TopicSubscription extends A
/**
* inform the MessageConsumer on the client to change it's prefetch
- *
+ *
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch) {
@@ -468,18 +465,17 @@ public class TopicSubscription extends A
}
}
- private void dispatchMatched() throws IOException {
+ private void dispatchMatched() throws IOException {
synchronized (matchedListMutex) {
if (!matched.isEmpty() && !isFull()) {
try {
matched.reset();
-
+
while (matched.hasNext() && !isFull()) {
MessageReference message = matched.next();
message.decrementReferenceCount();
matched.remove();
- // Message may have been sitting in the matched list a
- // while
+ // Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
if (message.isExpired()) {
discard(message);
@@ -503,8 +499,7 @@ public class TopicSubscription extends A
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatchedCounter.incrementAndGet();
- // Keep track if this subscription is receiving messages from a single
- // destination.
+ // Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
destination = node.getRegionDestination();
@@ -572,4 +567,13 @@ public class TopicSubscription extends A
return info.getPrefetchSize();
}
+ @Override
+ public void setPrefetchSize(int newSize) {
+ info.setPrefetchSize(newSize);
+ try {
+ dispatchMatched();
+ } catch(Exception e) {
+ LOG.trace("Caught exception on dispatch after prefetch size change.");
+ }
+ }
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java?rev=1356431&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java Mon Jul 2 19:36:24 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StompMissingMessageTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompMissingMessageTest.class);
+
+ protected String bindAddress = "stomp://localhost:61613";
+ protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
+ protected String jmsUri = "vm://localhost";
+
+ private BrokerService broker;
+ protected String destination;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = BrokerFactory.createBroker(new URI(confUri));
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.start();
+ broker.waitUntilStarted();
+
+ destination = "/topic/" + getTopicName();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void testProducerConsumerLoop() throws Exception {
+ final int ITERATIONS = 500;
+ int received = 0;
+
+ for (int i = 1; i <= ITERATIONS*2; i+=2) {
+ if (doTestProducerConsumer(i) != null) {
+ received++;
+ }
+ }
+
+ assertEquals(ITERATIONS, received);
+ }
+
+ public String doTestProducerConsumer(int index) throws Exception {
+ String message = null;
+
+ assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
+
+ StompConnection producer = stompConnect();
+ StompConnection consumer = stompConnect();
+
+ subscribe(consumer, Integer.toString(index));
+
+ sendMessage(producer, index);
+
+ try {
+ StompFrame frame = consumer.receive();
+ LOG.debug("Consumer got frame: " + message);
+ assertEquals(index, (int) Integer.valueOf(frame.getBody()));
+ message = frame.getBody();
+ } catch(Exception e) {
+ fail("Consumer["+index+"] got error while consuming: " + e.getMessage());
+ }
+
+ unsubscribe(consumer, Integer.toString(index));
+
+ stompDisconnect(consumer);
+ stompDisconnect(producer);
+
+ return message;
+ }
+
+ @Test
+ public void testProducerDurableConsumerLoop() throws Exception {
+ final int ITERATIONS = 500;
+ int received = 0;
+
+ for (int i = 1; i <= ITERATIONS*2; i+=2) {
+ if (doTestProducerDurableConsumer(i) != null) {
+ received++;
+ }
+ }
+
+ assertEquals(ITERATIONS, received);
+ }
+
+ public String doTestProducerDurableConsumer(int index) throws Exception {
+ String message = null;
+
+ assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
+
+ StompConnection producer = stompConnect();
+ StompConnection consumer = stompConnect("test");
+
+ subscribe(consumer, Integer.toString(index), true);
+
+ sendMessage(producer, index);
+
+ try {
+ StompFrame frame = consumer.receive();
+ LOG.debug("Consumer got frame: " + message);
+ assertEquals(index, (int) Integer.valueOf(frame.getBody()));
+ message = frame.getBody();
+ } catch(Exception e) {
+ fail("Consumer["+index+"] got error while consuming: " + e.getMessage());
+ }
+
+ unsubscribe(consumer, Integer.toString(index));
+
+ stompDisconnect(consumer);
+ stompDisconnect(producer);
+
+ return message;
+ }
+
+ protected void subscribe(StompConnection stompConnection, String subscriptionId) throws Exception {
+ subscribe(stompConnection, subscriptionId, false);
+ }
+
+ protected void subscribe(StompConnection stompConnection, String subscriptionId, boolean durable) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("id", subscriptionId);
+ if (durable) {
+ headers.put("activemq.subscriptionName", subscriptionId);
+ }
+ headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+ stompConnection.subscribe(destination, "auto", headers);
+
+ StompFrame received = stompConnection.receive();
+ assertEquals("RECEIPT", received.getAction());
+ String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+ assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+ }
+
+ protected void unsubscribe(StompConnection stompConnection, String subscriptionId) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("id", subscriptionId);
+ headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+ stompConnection.unsubscribe(destination, headers);
+
+ StompFrame received = stompConnection.receive();
+ assertEquals("RECEIPT", received.getAction());
+ String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+ assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+ }
+
+ protected void sendMessage(StompConnection producer, int index) throws Exception {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString());
+
+ producer.send(destination, Integer.toString(index), null, headers);
+
+ StompFrame received = producer.receive();
+ assertEquals("RECEIPT", received.getAction());
+ String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID);
+ assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
+ }
+
+ protected StompConnection stompConnect() throws Exception {
+ return stompConnect(null);
+ }
+
+ protected StompConnection stompConnect(String clientId) throws Exception {
+ StompConnection stompConnection = new StompConnection();
+ URI connectUri = new URI(bindAddress);
+ stompConnection.open(createSocket(connectUri));
+ stompConnection.connect("system", "manager", clientId);
+ return stompConnection;
+ }
+
+ protected Socket createSocket(URI connectUri) throws IOException {
+ return new Socket("127.0.0.1", connectUri.getPort());
+ }
+
+ protected String getTopicName() {
+ return getClass().getName() + ".Messages";
+ }
+
+ protected void stompDisconnect(StompConnection connection) throws Exception {
+ if (connection != null) {
+ String receiptId = UUID.randomUUID().toString();
+ connection.disconnect(receiptId);
+ if (!connection.receive().getAction().equals(Stomp.Responses.RECEIPT)) {
+ throw new Exception("Failed to receive receipt for disconnect.");
+ }
+ connection.close();
+ connection = null;
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native