You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/26 15:50:45 UTC

svn commit: r631237 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractRegion.java AbstractTempRegion.java TempQueueRegion.java TempTopicRegion.java

Author: rajdavies
Date: Tue Feb 26 06:50:43 2008
New Revision: 631237

URL: http://svn.apache.org/viewvc?rev=631237&view=rev
Log:
Introduced AbstractTempRegion - to cater for temp destination 
usage on  failover

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Feb 26 06:50:43 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -83,7 +84,7 @@
         this.destinationFactory = destinationFactory;
     }
 
-    public final void start() throws Exception {
+    public final  void start() throws Exception {
         started = true;
 
         Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
@@ -182,8 +183,7 @@
                 }
 
                 destinationMap.removeAll(destination);
-                dest.dispose(context);
-                dest.stop();
+                dispose(context,dest);
 
             } else {
                 LOG.debug("Destination doesn't exist: " + dest);
@@ -334,8 +334,15 @@
         Subscription sub = consumerExchange.getSubscription();
         if (sub == null) {
             sub = subscriptions.get(ack.getConsumerId());
+            
             if (sub == null) {
-                throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
+                //networked subscriptions are going to acknowledge in flight messages 
+                //on behalf a subscription that is no more ...
+                if (!consumerExchange.getConnectionContext().isNetworkConnection()) {
+                    throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
+                }else {
+                    return;
+                }
             }
             consumerExchange.setSubscription(sub);
         }
@@ -427,7 +434,9 @@
             dest.removeProducer(context, info);
         }
     }
-
-
-
+    
+    protected void dispose(ConnectionContext context,Destination dest) throws Exception {
+        dest.dispose(context);
+        dest.stop();
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java?rev=631237&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java Tue Feb 26 06:50:43 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+public abstract class AbstractTempRegion extends AbstractRegion {
+    private static int TIME_BEFORE_PURGE = 60000;
+    private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+    private Map<CachedDestination,Destination> cachedDestinations = new ConcurrentHashMap<CachedDestination,Destination>();
+    private final Timer purgeTimer;
+    private final TimerTask purgeTask;
+    /**
+     * @param broker
+     * @param destinationStatistics
+     * @param memoryManager
+     * @param taskRunnerFactory
+     * @param destinationFactory
+     */
+    public AbstractTempRegion(RegionBroker broker,
+            DestinationStatistics destinationStatistics,
+            SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+            DestinationFactory destinationFactory) {
+        super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
+                destinationFactory);
+        this.purgeTimer = new Timer(true);
+        this.purgeTask = new TimerTask() {
+            public void run() {
+                doPurge();
+            }
+            
+        };
+        this.purgeTimer.schedule(purgeTask, TIME_BEFORE_PURGE,TIME_BEFORE_PURGE);
+    }
+    
+       
+    public void stop() throws Exception {
+        super.stop();
+        if (purgeTimer != null) {
+            purgeTimer.cancel();
+        }
+    }
+    
+    protected abstract Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
+
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+        Destination result = cachedDestinations.remove(new CachedDestination(destination));
+        if (result==null) {
+            result = doCreateDestination(context, destination);
+        }
+        return result;
+    }
+    
+    protected final void dispose(ConnectionContext context,Destination dest) throws Exception {
+        //add to cache
+        cachedDestinations.put(new CachedDestination(dest.getActiveMQDestination()), dest);
+    }
+    
+    private void doDispose(Destination dest) {
+        ConnectionContext context = new ConnectionContext();
+        try {
+            dest.dispose(context);
+            dest.stop();
+        } catch (Exception e) {
+           LOG.warn("Failed to dispose of " + dest,e);
+        }
+      
+    }
+    
+    private void doPurge() {
+        long currentTime = System.currentTimeMillis();
+        if (cachedDestinations.size() > 0) {
+            Set<CachedDestination> tmp = new HashSet<CachedDestination>(cachedDestinations.keySet());
+            for(CachedDestination key: tmp) {
+                if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) {
+                    Destination dest = cachedDestinations.remove(key);
+                    if (dest != null) {
+                        doDispose(dest);
+                    }
+                }
+            }
+        }
+    }
+    
+    static class CachedDestination{
+        long timeStamp;
+        ActiveMQDestination destination;
+        
+        CachedDestination(ActiveMQDestination destination){
+            this.destination=destination;
+            this.timeStamp=System.currentTimeMillis();
+        }
+        
+        public int hashCode() {
+            return destination.hashCode();
+        }
+        
+        public boolean equals(Object o) {
+            if (o instanceof ActiveMQDestination) {
+                CachedDestination other = (CachedDestination) o;
+                return other.destination.equals(this.destination);
+            }
+            return false;
+        }
+        
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Tue Feb 26 06:50:43 2008
@@ -19,18 +19,22 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.7 $
  */
-public class TempQueueRegion extends AbstractRegion {
-
+public class TempQueueRegion extends AbstractTempRegion {
+    private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+    
     public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                            DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
@@ -39,20 +43,26 @@
         // setAutoCreateDestinations(false);
     }
 
-    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+    protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
         return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
 
             public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
-
                 // Only consumers on the same connection can consume from
                 // the temporary destination
-                if (!context.isNetworkConnection() && !tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) {
-                    throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest);
+                // However, we could have failed over - and we do this
+                // check client side anyways ....
+                if (!context.isFaultTolerant()
+                        && (!context.isNetworkConnection() && !tempDest
+                                .getConnectionId().equals(
+                                        sub.getConsumerInfo().getConsumerId()
+                                                .getConnectionId()))) {
+
+                    tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
+                    LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
                 }
                 super.addSubscription(context, sub);
             };
-
         };
     }
 
@@ -79,5 +89,4 @@
 
         super.removeDestination(context, destination, timeout);
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Tue Feb 26 06:50:43 2008
@@ -30,7 +30,7 @@
 /**
  * @version $Revision: 1.7 $
  */
-public class TempTopicRegion extends AbstractRegion {
+public class TempTopicRegion extends AbstractTempRegion {
 
     private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
 
@@ -80,5 +80,11 @@
         }
 
         super.removeDestination(context, destination, timeout);
+    }
+
+    
+    protected Destination doCreateDestination(ConnectionContext context,
+            ActiveMQDestination destination) throws Exception {
+        return destinationFactory.createDestination(context, destination, destinationStatistics);
     }
 }