You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/10 19:18:14 UTC

svn commit: r646880 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/

Author: rajdavies
Date: Thu Apr 10 10:18:05 2008
New Revision: 646880

URL: http://svn.apache.org/viewvc?rev=646880&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1656

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu Apr 10 10:18:05 2008
@@ -312,5 +312,10 @@
      * @param messageReference
      */
     void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * @return the broker sequence id
+     */
+    long getBrokerSequenceId();
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu Apr 10 10:18:05 2008
@@ -260,4 +260,8 @@
     public Broker getRoot() {
         return next.getRoot();
     }
+
+    public long getBrokerSequenceId() {
+        return next.getBrokerSequenceId();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu Apr 10 10:18:05 2008
@@ -252,4 +252,8 @@
     public Broker getRoot() {
         return null;
     }
+    
+    public long getBrokerSequenceId() {
+        return -1l;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu Apr 10 10:18:05 2008
@@ -263,4 +263,8 @@
     public Broker getRoot() {
         throw new BrokerStoppedException(this.message);
     }
+    
+    public long getBrokerSequenceId() {
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu Apr 10 10:18:05 2008
@@ -273,5 +273,9 @@
     public Broker getRoot() {
         return getNext().getRoot();
     }
+    
+    public long getBrokerSequenceId() {
+        return getNext().getBrokerSequenceId();
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Apr 10 10:18:05 2008
@@ -46,6 +46,7 @@
     private boolean lazyDispatch=false;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
+    protected final Broker regionBroker;
     
     /**
      * @param broker 
@@ -66,6 +67,7 @@
         this.systemUsage = brokerService.getProducerSystemUsage();
         this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
         this.memoryUsage.setUsagePortion(1.0f);
+        this.regionBroker = brokerService.getRegionBroker();
     }
     
     /**
@@ -193,5 +195,9 @@
 
     public void setLazyDispatch(boolean lazyDispatch) {
         this.lazyDispatch = lazyDispatch;
-    }      
+    } 
+    
+    protected long getDestinationSequenceId() {
+        return regionBroker.getBrokerSequenceId();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Apr 10 10:18:05 2008
@@ -430,7 +430,7 @@
                                 "Connection closed, send aborted.");
                     }
                 }
-
+                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 store.addMessage(context, message);
 
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Apr 10 10:18:05 2008
@@ -418,8 +418,6 @@
     }
 
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
-        long si = sequenceGenerator.getNextSequenceId();
-        message.getMessageId().setBrokerSequenceId(si);
         message.setBrokerInTime(System.currentTimeMillis());
         if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
             ActiveMQDestination destination = message.getDestination();
@@ -728,6 +726,15 @@
         } catch (Exception e) {
             LOG.fatal("Trying to get Root Broker " + e);
             throw new RuntimeException("The broker from the BrokerService should not throw an exception");
+        }
+    }
+    
+    /**
+     * @return the broker sequence id
+     */
+    public long getBrokerSequenceId() {
+        synchronized(sequenceGenerator) {
+            return sequenceGenerator.getNextSequenceId();
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Apr 10 10:18:05 2008
@@ -382,6 +382,7 @@
         final ConnectionContext context = producerExchange
                 .getConnectionContext();
         message.setRegionDestination(this);
+        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
 
         if (topicStore != null && message.isPersistent()
                 && !canOptimizeOutPersistence()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?rev=646880&r1=646879&r2=646880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Thu Apr 10 10:18:05 2008
@@ -258,4 +258,8 @@
     public Broker getRoot() {
         return this;
     }
+    
+    public long getBrokerSequenceId() {
+        return -1l;
+    }
 }