You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/10 19:18:14 UTC
svn commit: r646880 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/
Author: rajdavies
Date: Thu Apr 10 10:18:05 2008
New Revision: 646880
URL: http://svn.apache.org/viewvc?rev=646880&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1656
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu Apr 10 10:18:05 2008
@@ -312,5 +312,10 @@
* @param messageReference
*/
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
+
+ /**
+ * @return the broker sequence id
+ */
+ long getBrokerSequenceId();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu Apr 10 10:18:05 2008
@@ -260,4 +260,8 @@
public Broker getRoot() {
return next.getRoot();
}
+
+ public long getBrokerSequenceId() {
+ return next.getBrokerSequenceId();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu Apr 10 10:18:05 2008
@@ -252,4 +252,8 @@
public Broker getRoot() {
return null;
}
+
+ public long getBrokerSequenceId() {
+ return -1l;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu Apr 10 10:18:05 2008
@@ -263,4 +263,8 @@
public Broker getRoot() {
throw new BrokerStoppedException(this.message);
}
+
+ public long getBrokerSequenceId() {
+ throw new BrokerStoppedException(this.message);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu Apr 10 10:18:05 2008
@@ -273,5 +273,9 @@
public Broker getRoot() {
return getNext().getRoot();
}
+
+ public long getBrokerSequenceId() {
+ return getNext().getBrokerSequenceId();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Apr 10 10:18:05 2008
@@ -46,6 +46,7 @@
private boolean lazyDispatch=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
+ protected final Broker regionBroker;
/**
* @param broker
@@ -66,6 +67,7 @@
this.systemUsage = brokerService.getProducerSystemUsage();
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
+ this.regionBroker = brokerService.getRegionBroker();
}
/**
@@ -193,5 +195,9 @@
public void setLazyDispatch(boolean lazyDispatch) {
this.lazyDispatch = lazyDispatch;
- }
+ }
+
+ protected long getDestinationSequenceId() {
+ return regionBroker.getBrokerSequenceId();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Apr 10 10:18:05 2008
@@ -430,7 +430,7 @@
"Connection closed, send aborted.");
}
}
-
+ message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Apr 10 10:18:05 2008
@@ -418,8 +418,6 @@
}
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
- long si = sequenceGenerator.getNextSequenceId();
- message.getMessageId().setBrokerSequenceId(si);
message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
ActiveMQDestination destination = message.getDestination();
@@ -728,6 +726,15 @@
} catch (Exception e) {
LOG.fatal("Trying to get Root Broker " + e);
throw new RuntimeException("The broker from the BrokerService should not throw an exception");
+ }
+ }
+
+ /**
+ * @return the broker sequence id
+ */
+ public long getBrokerSequenceId() {
+ synchronized(sequenceGenerator) {
+ return sequenceGenerator.getNextSequenceId();
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Apr 10 10:18:05 2008
@@ -382,6 +382,7 @@
final ConnectionContext context = producerExchange
.getConnectionContext();
message.setRegionDestination(this);
+ message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (topicStore != null && message.isPersistent()
&& !canOptimizeOutPersistence()) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Thu Apr 10 10:18:05 2008
@@ -258,4 +258,8 @@
public Broker getRoot() {
return this;
}
+
+ public long getBrokerSequenceId() {
+ return -1l;
+ }
}