You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/26 15:50:45 UTC
svn commit: r631237 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
AbstractRegion.java AbstractTempRegion.java TempQueueRegion.java
TempTopicRegion.java
Author: rajdavies
Date: Tue Feb 26 06:50:43 2008
New Revision: 631237
URL: http://svn.apache.org/viewvc?rev=631237&view=rev
Log:
Introduced AbstractTempRegion - to cater for temp destination
usage on failover
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Feb 26 06:50:43 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -83,7 +84,7 @@
this.destinationFactory = destinationFactory;
}
- public final void start() throws Exception {
+ public final void start() throws Exception {
started = true;
Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
@@ -182,8 +183,7 @@
}
destinationMap.removeAll(destination);
- dest.dispose(context);
- dest.stop();
+ dispose(context,dest);
} else {
LOG.debug("Destination doesn't exist: " + dest);
@@ -334,8 +334,15 @@
Subscription sub = consumerExchange.getSubscription();
if (sub == null) {
sub = subscriptions.get(ack.getConsumerId());
+
if (sub == null) {
- throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
+ //networked subscriptions are going to acknowledge in flight messages
+ //on behalf a subscription that is no more ...
+ if (!consumerExchange.getConnectionContext().isNetworkConnection()) {
+ throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
+ }else {
+ return;
+ }
}
consumerExchange.setSubscription(sub);
}
@@ -427,7 +434,9 @@
dest.removeProducer(context, info);
}
}
-
-
-
+
+ protected void dispose(ConnectionContext context,Destination dest) throws Exception {
+ dest.dispose(context);
+ dest.stop();
+ }
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java?rev=631237&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java Tue Feb 26 06:50:43 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+public abstract class AbstractTempRegion extends AbstractRegion {
+ private static int TIME_BEFORE_PURGE = 60000;
+ private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+ private Map<CachedDestination,Destination> cachedDestinations = new ConcurrentHashMap<CachedDestination,Destination>();
+ private final Timer purgeTimer;
+ private final TimerTask purgeTask;
+ /**
+ * @param broker
+ * @param destinationStatistics
+ * @param memoryManager
+ * @param taskRunnerFactory
+ * @param destinationFactory
+ */
+ public AbstractTempRegion(RegionBroker broker,
+ DestinationStatistics destinationStatistics,
+ SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+ DestinationFactory destinationFactory) {
+ super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
+ destinationFactory);
+ this.purgeTimer = new Timer(true);
+ this.purgeTask = new TimerTask() {
+ public void run() {
+ doPurge();
+ }
+
+ };
+ this.purgeTimer.schedule(purgeTask, TIME_BEFORE_PURGE,TIME_BEFORE_PURGE);
+ }
+
+
+ public void stop() throws Exception {
+ super.stop();
+ if (purgeTimer != null) {
+ purgeTimer.cancel();
+ }
+ }
+
+ protected abstract Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
+
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+ Destination result = cachedDestinations.remove(new CachedDestination(destination));
+ if (result==null) {
+ result = doCreateDestination(context, destination);
+ }
+ return result;
+ }
+
+ protected final void dispose(ConnectionContext context,Destination dest) throws Exception {
+ //add to cache
+ cachedDestinations.put(new CachedDestination(dest.getActiveMQDestination()), dest);
+ }
+
+ private void doDispose(Destination dest) {
+ ConnectionContext context = new ConnectionContext();
+ try {
+ dest.dispose(context);
+ dest.stop();
+ } catch (Exception e) {
+ LOG.warn("Failed to dispose of " + dest,e);
+ }
+
+ }
+
+ private void doPurge() {
+ long currentTime = System.currentTimeMillis();
+ if (cachedDestinations.size() > 0) {
+ Set<CachedDestination> tmp = new HashSet<CachedDestination>(cachedDestinations.keySet());
+ for(CachedDestination key: tmp) {
+ if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) {
+ Destination dest = cachedDestinations.remove(key);
+ if (dest != null) {
+ doDispose(dest);
+ }
+ }
+ }
+ }
+ }
+
+ static class CachedDestination{
+ long timeStamp;
+ ActiveMQDestination destination;
+
+ CachedDestination(ActiveMQDestination destination){
+ this.destination=destination;
+ this.timeStamp=System.currentTimeMillis();
+ }
+
+ public int hashCode() {
+ return destination.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof ActiveMQDestination) {
+ CachedDestination other = (CachedDestination) o;
+ return other.destination.equals(this.destination);
+ }
+ return false;
+ }
+
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Tue Feb 26 06:50:43 2008
@@ -19,18 +19,22 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.7 $
*/
-public class TempQueueRegion extends AbstractRegion {
-
+public class TempQueueRegion extends AbstractTempRegion {
+ private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
@@ -39,20 +43,26 @@
// setAutoCreateDestinations(false);
}
- protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+ protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
-
// Only consumers on the same connection can consume from
// the temporary destination
- if (!context.isNetworkConnection() && !tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) {
- throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest);
+ // However, we could have failed over - and we do this
+ // check client side anyways ....
+ if (!context.isFaultTolerant()
+ && (!context.isNetworkConnection() && !tempDest
+ .getConnectionId().equals(
+ sub.getConsumerInfo().getConsumerId()
+ .getConnectionId()))) {
+
+ tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
+ LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
};
-
};
}
@@ -79,5 +89,4 @@
super.removeDestination(context, destination, timeout);
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=631237&r1=631236&r2=631237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Tue Feb 26 06:50:43 2008
@@ -30,7 +30,7 @@
/**
* @version $Revision: 1.7 $
*/
-public class TempTopicRegion extends AbstractRegion {
+public class TempTopicRegion extends AbstractTempRegion {
private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
@@ -80,5 +80,11 @@
}
super.removeDestination(context, destination, timeout);
+ }
+
+
+ protected Destination doCreateDestination(ConnectionContext context,
+ ActiveMQDestination destination) throws Exception {
+ return destinationFactory.createDestination(context, destination, destinationStatistics);
}
}