You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2014/12/10 13:52:48 UTC
[2/3] activemq-6 git commit: ActiveMQ6-6 Factor out WildFly XA
Recovery
ActiveMQ6-6 Factor out WildFly XA Recovery
Pulls out WildFly XA Recovery specifics into a different project. Some
XA recovery code is still present and is used as integration points for
integrating TM XA recovery processes.
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/8f91af1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/8f91af1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/8f91af1b
Branch: refs/heads/master
Commit: 8f91af1b5cdf6ce6b0e4572919a9b7653ef05da9
Parents: d7c7d86
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Dec 2 12:58:08 2014 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Dec 9 16:08:34 2014 +0000
----------------------------------------------------------------------
activemq-jms-server/pom.xml | 11 +-
.../activemq/jms/bridge/impl/JMSBridgeImpl.java | 18 +-
.../jms/server/ActiveMQJMSServerLogger.java | 11 -
.../recovery/ActiveMQRecoveryRegistry.java | 254 ---------
.../server/recovery/ActiveMQRegistryBase.java | 75 ---
.../recovery/ActiveMQXAResourceRecovery.java | 235 --------
.../recovery/ActiveMQXAResourceWrapper.java | 535 -------------------
.../jms/server/recovery/RecoveryDiscovery.java | 236 --------
.../jms/server/recovery/XARecoveryConfig.java | 171 ------
.../jms/server/recovery/package-info.java | 18 -
.../ra/ActiveMQRAManagedConnectionFactory.java | 2 +-
.../activemq/ra/inflow/ActiveMQActivation.java | 2 +-
.../activemq/ra/recovery/RecoveryManager.java | 18 +-
activemq-service-extensions/pom.xml | 5 +
.../xa/recovery/ActiveMQRegistry.java | 33 ++
.../xa/recovery/ActiveMQRegistryImpl.java | 60 +++
.../xa/recovery/ActiveMQXARecoveryLogger.java | 118 ++++
.../xa/recovery/ActiveMQXAResourceRecovery.java | 233 ++++++++
.../xa/recovery/ActiveMQXAResourceWrapper.java | 534 ++++++++++++++++++
.../xa/recovery/XARecoveryConfig.java | 171 ++++++
tests/byteman-tests/pom.xml | 4 +-
.../integration/ra/ResourceAdapterTest.java | 21 +-
tests/joram-tests/pom.xml | 4 +-
tests/performance-tests/pom.xml | 4 +-
tests/soak-tests/pom.xml | 4 +-
tests/stress-tests/pom.xml | 4 +-
tests/timing-tests/pom.xml | 4 +-
27 files changed, 1203 insertions(+), 1582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-server/pom.xml b/activemq-jms-server/pom.xml
index 73215ae..246dea3 100644
--- a/activemq-jms-server/pom.xml
+++ b/activemq-jms-server/pom.xml
@@ -50,13 +50,14 @@
<artifactId>geronimo-ejb_3.0_spec</artifactId>
</dependency>
<dependency>
- <groupId>org.jboss.jbossts.jts</groupId>
- <artifactId>jbossjts-jacorb</artifactId>
- <optional>true</optional>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.1_spec</artifactId>
+ <version>1.1.1</version>
</dependency>
<dependency>
- <groupId>org.jboss</groupId>
- <artifactId>jboss-transaction-spi</artifactId>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-service-extensions</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
index 169bce8..fed9420 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
@@ -44,6 +44,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -63,9 +64,10 @@ import org.apache.activemq.jms.client.ActiveMQConnection;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQMessage;
import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
-import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
import org.apache.activemq.service.extensions.ServiceUtils;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.DefaultSensitiveStringCodec;
import org.apache.activemq.utils.PasswordMaskingUtil;
@@ -183,7 +185,7 @@ public final class JMSBridgeImpl implements JMSBridge
private static final int FORWARD_MODE_NONTX = 2;
- private ActiveMQRegistryBase registry;
+ private ActiveMQRegistry registry;
/*
* Constructor for MBean
@@ -2228,7 +2230,15 @@ public final class JMSBridgeImpl implements JMSBridge
{
try
{
- registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
+ ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
+ if (sl.iterator().hasNext())
+ {
+ registry = sl.iterator().next();
+ }
+ else
+ {
+ registry = ActiveMQRegistryImpl.getInstance();
+ }
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
index 1fbd0b7..eb97abb 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
@@ -17,7 +17,6 @@
package org.apache.activemq.jms.server;
import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause;
@@ -90,11 +89,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
format = Message.Format.MESSAGE_FORMAT)
void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
- format = Message.Format.MESSAGE_FORMAT)
- void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
-
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
void xaRecoveryError(@Cause Exception e);
@@ -104,11 +98,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
format = Message.Format.MESSAGE_FORMAT)
void failedToCorrectHost(@Cause Exception e, String name);
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
- format = Message.Format.MESSAGE_FORMAT)
- void xaRecoveryStartError(XARecoveryConfig e);
-
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
void jmsConfigMissingKey(Node e);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
deleted file mode 100644
index f0fd2e7..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-import org.jboss.tm.XAResourceRecovery;
-
-/**
- * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
- * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link ActiveMQRecoveryRegistry#register(XARecoveryConfig)}</p>
- * <p>Later the {@link RecoveryDiscovery} will call {@link ActiveMQRecoveryRegistry#nodeUp(String, Pair, String, String)}
- * so we will keep a track of nodes on the cluster
- * or nodes where this server is connected to. </p>
- *
- * @author clebertsuconic
- */
-public class ActiveMQRecoveryRegistry implements XAResourceRecovery
-{
-
- private static final ActiveMQRecoveryRegistry theInstance = new ActiveMQRecoveryRegistry();
-
- private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
-
- /**
- * The list by server id and resource adapter wrapper, what will actually be calling recovery.
- * This will be returned by getXAResources
- */
- private final ConcurrentHashMap<String, ActiveMQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, ActiveMQXAResourceWrapper>();
-
- /**
- * In case of failures, we retry on the next getXAResources
- */
- private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
-
- private ActiveMQRecoveryRegistry()
- {
- }
-
- /**
- * This will be called periodically by the Transaction Manager
- */
- public XAResource[] getXAResources()
- {
- try
- {
- checkFailures();
-
- ActiveMQXAResourceWrapper[] resourceArray = new ActiveMQXAResourceWrapper[recoveries.size()];
- resourceArray = recoveries.values().toArray(resourceArray);
-
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
- ActiveMQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
- for (Map.Entry<String, ActiveMQXAResourceWrapper> entry : recoveries.entrySet())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
- }
- ActiveMQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
- }
-
- return resourceArray;
- }
- catch (Throwable e)
- {
- ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
- return new XAResource[]{};
- }
- }
-
- public static ActiveMQRecoveryRegistry getInstance()
- {
- return theInstance;
- }
-
- /**
- * This will be called by then resource adapters, to register a new discovery
- *
- * @param resourceConfig
- */
- public void register(final XARecoveryConfig resourceConfig)
- {
- RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
- RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
- if (discoveryRecord == null)
- {
- discoveryRecord = newInstance;
- discoveryRecord.start(false);
- }
- // you could have a configuration shared with multiple MDBs or RAs
- discoveryRecord.incrementUsage();
- }
-
- /**
- * Reference counts and deactivate a configuration
- * Notice: this won't remove the servers since a server may have previous XIDs
- *
- * @param resourceConfig
- */
- public void unRegister(final XARecoveryConfig resourceConfig)
- {
- RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
- if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
- {
- discoveryRecord = configSet.remove(resourceConfig);
- if (discoveryRecord != null)
- {
- discoveryRecord.stop();
- }
- }
- }
-
- /**
- * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
- * maybe we should.
- */
- public void stop()
- {
- for (RecoveryDiscovery recoveryDiscovery : configSet.values())
- {
- recoveryDiscovery.stop();
- }
- for (ActiveMQXAResourceWrapper activeMQXAResourceWrapper : recoveries.values())
- {
- activeMQXAResourceWrapper.close();
- }
- recoveries.clear();
- configSet.clear();
- }
-
- /**
- * in case of a failure the Discovery will register itslef to retry
- *
- * @param failedDiscovery
- */
- public void failedDiscovery(RecoveryDiscovery failedDiscovery)
- {
- ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
- synchronized (failedDiscoverySet)
- {
- failedDiscoverySet.add(failedDiscovery);
- }
- }
-
- /**
- * @param nodeID
- * @param networkConfiguration
- * @param username
- * @param password
- */
- public void nodeUp(String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
- String username,
- String password)
- {
-
- if (recoveries.get(nodeID) == null)
- {
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
- }
- XARecoveryConfig config = new XARecoveryConfig(true,
- extractTransportConfiguration(networkConfiguration),
- username,
- password);
-
- ActiveMQXAResourceWrapper wrapper = new ActiveMQXAResourceWrapper(config);
- recoveries.putIfAbsent(nodeID, wrapper);
- }
- }
-
- public void nodeDown(String nodeID)
- {
- }
-
- /**
- * this will go through the list of retries
- */
- private void checkFailures()
- {
- final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
-
- // it will transfer all the discoveries to a new collection
- synchronized (failedDiscoverySet)
- {
- failures.addAll(failedDiscoverySet);
- failedDiscoverySet.clear();
- }
-
- if (failures.size() > 0)
- {
- // This shouldn't happen on a regular scenario, however when this retry happens this needs
- // to be done on a new thread
- Thread t = new Thread("ActiveMQ Recovery Discovery Reinitialization")
- {
- @Override
- public void run()
- {
- for (RecoveryDiscovery discovery : failures)
- {
- try
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
- discovery.start(true);
- }
- catch (Throwable e)
- {
- ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
- }
- }
- }
- };
-
- t.start();
- }
- }
-
- /**
- * @param networkConfiguration
- * @return
- */
- private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
- {
- if (networkConfiguration.getB() != null)
- {
- return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
- }
- return new TransportConfiguration[]{networkConfiguration.getA()};
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
deleted file mode 100644
index 1ed0104..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.tm.XAResourceRecoveryRegistry;
-
-/**
- * This class is a base class for the integration layer where
- * This class is used on integration points and this is just a bridge to the real registry at
- * {@link ActiveMQRecoveryRegistry}
- *
- * @author Clebert
- *
- *
- */
-public abstract class ActiveMQRegistryBase
-{
-
- private final AtomicBoolean started = new AtomicBoolean(false);
-
- public ActiveMQRegistryBase()
- {
- }
-
-
- public abstract XAResourceRecoveryRegistry getTMRegistry();
-
- public void register(final XARecoveryConfig resourceConfig)
- {
- init();
- ActiveMQRecoveryRegistry.getInstance().register(resourceConfig);
- }
-
-
-
- public void unRegister(final XARecoveryConfig resourceConfig)
- {
- init();
- ActiveMQRecoveryRegistry.getInstance().unRegister(resourceConfig);
- }
-
- public void stop()
- {
- if (started.compareAndSet(true, false) && getTMRegistry() != null)
- {
- getTMRegistry().removeXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
- ActiveMQRecoveryRegistry.getInstance().stop();
- }
- }
-
- private void init()
- {
- if (started.compareAndSet(false, true) && getTMRegistry() != null)
- {
- getTMRegistry().addXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
deleted file mode 100644
index 4f85925..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- * <p>
- * In reality only recover, rollback and commit will be called but we still need to be implement all
- * methods just in case.
- * <p>
- * To enable this add the following to the jbossts-properties file
- * <pre>
- * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1"
- * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/>
- * </pre>
- * <p>
- * you'll need something like this if the ActiveMQ Server is remote
- * <pre>
- * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
- * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>
- * </pre>
- * <p>
- * you'll need something like this if the ActiveMQ Server is remote and has failover configured
- * <pre>
- * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
- * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>
- * </pre>
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- */
-public class ActiveMQXAResourceRecovery implements XAResourceRecovery
-{
- private final boolean trace = ActiveMQJMSServerLogger.LOGGER.isTraceEnabled();
-
- private boolean hasMore;
-
- private ActiveMQXAResourceWrapper res;
-
- public ActiveMQXAResourceRecovery()
- {
- if (trace)
- {
- ActiveMQJMSServerLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
- }
- }
-
- public boolean initialise(final String config)
- {
- if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
- }
-
- String[] configs = config.split(";");
- XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- String s = configs[i];
- ConfigParser parser = new ConfigParser(s);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
- TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
- xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
- }
-
-
- res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
-
- if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.trace(this + " initialised");
- }
-
- return true;
- }
-
- public boolean hasMoreResources()
- {
- if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
- }
-
- /*
- * The way hasMoreResources is supposed to work is as follows:
- * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
- * true it will call getXAResource.
- * It will repeat that until hasMoreResources returns false.
- * Then the sweep is over.
- * For the next sweep hasMoreResources should return true, etc.
- *
- * In our case where we only need to return one XAResource per sweep,
- * hasMoreResources should basically alternate between true and false.
- *
- *
- */
-
- hasMore = !hasMore;
-
- return hasMore;
- }
-
- public XAResource getXAResource()
- {
- if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.trace(this + " getXAResource");
- }
-
- return res;
- }
-
- public XAResource[] getXAResources()
- {
- return new XAResource[]{res};
- }
-
- @Override
- protected void finalize()
- {
- res.close();
- }
-
- public static class ConfigParser
- {
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorParameters;
-
- private String username;
-
- private String password;
-
- public ConfigParser(final String config)
- {
- if (config == null || config.length() == 0)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- String[] strings = config.split(",");
-
- // First (mandatory) param is the connector factory class name
- if (strings.length < 1)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- connectorFactoryClassName = strings[0].trim();
-
- // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
- if (strings.length >= 2)
- {
-
- username = strings[1].trim();
- if (username.length() == 0)
- {
- username = null;
- }
-
- if (strings.length == 2)
- {
- throw new IllegalArgumentException("If username is specified, password must be specified too");
- }
-
- password = strings[2].trim();
- if (password.length() == 0)
- {
- password = null;
- }
- }
-
- // other tokens are for connector configurations
- connectorParameters = new HashMap<String, Object>();
- if (strings.length >= 3)
- {
- for (int i = 3; i < strings.length; i++)
- {
- String[] str = strings[i].split("=");
- if (str.length == 2)
- {
- connectorParameters.put(str[0].trim(), str[1].trim());
- }
- }
- }
- }
-
- public String getConnectorFactoryClassName()
- {
- return connectorFactoryClassName;
- }
-
- public Map<String, Object> getConnectorParameters()
- {
- return connectorParameters;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
deleted file mode 100644
index a7841ba..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.Arrays;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.ActiveMQNotConnectedException;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.core.client.SessionFailureListener;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * XAResourceWrapper.
- *
- * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
- *
- * The reason why we don't use that class directly is that it assumes on failure of connection
- * the RM_FAIL or RM_ERR is thrown, but in ActiveMQ we throw XA_RETRY since we want the recovery manager to be able
- * to retry on failure without having to manually retry
- *
- * @author <a href="adrian@jboss.com">Adrian Brock</a>
- * @author <a href="tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *
- * @version $Revision: 45341 $
- */
-public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureListener
-{
- /** The state lock */
- private static final Object lock = new Object();
-
- private ServerLocator serverLocator;
-
- private ClientSessionFactory csf;
-
- private ClientSession delegate;
-
- private XARecoveryConfig[] xaRecoveryConfigs;
-
- public ActiveMQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
- {
- this.xaRecoveryConfigs = xaRecoveryConfigs;
-
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
- ", instance=" +
- System.identityHashCode(this));
- }
- }
-
- public Xid[] recover(final int flag) throws XAException
- {
- XAResource xaResource = getDelegate(false);
-
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
- }
-
- try
- {
- Xid[] xids = xaResource.recover(flag);
-
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
- }
-
- return xids;
- }
- catch (XAException e)
- {
- ActiveMQJMSServerLogger.LOGGER.xaRecoverError(e);
- throw check(e);
- }
- }
-
- public void commit(final Xid xid, final boolean onePhase) throws XAException
- {
- XAResource xaResource = getDelegate(true);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
- }
- try
- {
- xaResource.commit(xid, onePhase);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public void rollback(final Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate(true);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
- }
- try
- {
- xaResource.rollback(xid);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public void forget(final Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate(false);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
- }
-
- try
- {
- xaResource.forget(xid);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public boolean isSameRM(XAResource xaRes) throws XAException
- {
- if (xaRes instanceof ActiveMQXAResourceWrapper)
- {
- xaRes = ((ActiveMQXAResourceWrapper)xaRes).getDelegate(false);
- }
-
- XAResource xaResource = getDelegate(false);
- try
- {
- return xaResource.isSameRM(xaRes);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public int prepare(final Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate(true);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
- }
- try
- {
- return xaResource.prepare(xid);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public void start(final Xid xid, final int flags) throws XAException
- {
- XAResource xaResource = getDelegate(false);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
- }
- try
- {
- xaResource.start(xid, flags);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public void end(final Xid xid, final int flags) throws XAException
- {
- XAResource xaResource = getDelegate(false);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
- }
- try
- {
- xaResource.end(xid, flags);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public int getTransactionTimeout() throws XAException
- {
- XAResource xaResource = getDelegate(false);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
- }
- try
- {
- return xaResource.getTransactionTimeout();
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public boolean setTransactionTimeout(final int seconds) throws XAException
- {
- XAResource xaResource = getDelegate(false);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
- }
- try
- {
- return xaResource.setTransactionTimeout(seconds);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- }
-
- public void connectionFailed(final ActiveMQException me, boolean failedOver)
- {
- if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
- {
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
- }
- }
- else
- {
- ActiveMQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
- }
- close();
- }
-
- @Override
- public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
- {
- connectionFailed(me, failedOver);
- }
-
- public void beforeReconnect(final ActiveMQException me)
- {
- }
-
- /**
- * Get the connectionFactory XAResource
- *
- * @return the connectionFactory
- * @throws XAException for any problem
- */
- private XAResource getDelegate(boolean retry) throws XAException
- {
- XAResource result = null;
- Exception error = null;
- try
- {
- result = connect();
- }
- catch (Exception e)
- {
- error = e;
- }
-
- if (result == null)
- {
- // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
- // all chaos is let loose
- if (retry)
- {
- XAException xae = new XAException("Connection unavailable for xa recovery");
- xae.errorCode = XAException.XA_RETRY;
- if (error != null)
- {
- xae.initCause(error);
- }
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
- }
- throw xae;
- }
- else
- {
- XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
- xae.errorCode = XAException.XAER_RMERR;
- if (error != null)
- {
- xae.initCause(error);
- }
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
- }
- throw xae;
- }
-
- }
-
- return result;
- }
-
- /**
- * Connect to the server if not already done so
- *
- * @return the connectionFactory XAResource
- * @throws Exception for any problem
- */
- protected XAResource connect() throws Exception
- {
- // Do we already have a valid connectionFactory?
- synchronized (ActiveMQXAResourceWrapper.lock)
- {
- if (delegate != null)
- {
- return delegate;
- }
- }
-
- for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
- {
-
- if (xaRecoveryConfig == null)
- {
- continue;
- }
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
- }
-
- ClientSession cs = null;
-
- try
- {
- // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
- // we really need to control what server it's connected to
-
- // Manual configuration may still use discovery, so we will keep this
- if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
- {
- serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
- }
- else
- {
- serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
- }
- serverLocator.disableFinalizeCheck();
- csf = serverLocator.createSessionFactory();
- if (xaRecoveryConfig.getUsername() == null)
- {
- cs = csf.createSession(true, false, false);
- }
- else
- {
- cs = csf.createSession(xaRecoveryConfig.getUsername(),
- xaRecoveryConfig.getPassword(),
- true,
- false,
- false,
- false,
- 1);
- }
- }
- catch (Throwable e)
- {
- ActiveMQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
- }
-
- try
- {
- if (cs != null) cs.close();
- if (serverLocator != null) serverLocator.close();
- }
- catch (Throwable ignored)
- {
- if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
- }
- }
- continue;
- }
-
- cs.addFailureListener(this);
-
- synchronized (ActiveMQXAResourceWrapper.lock)
- {
- delegate = cs;
- }
-
- return delegate;
- }
- ActiveMQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
- throw new ActiveMQNotConnectedException();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- return "ActiveMQXAResourceWrapper [serverLocator=" + serverLocator +
- ", csf=" +
- csf +
- ", delegate=" +
- delegate +
- ", xaRecoveryConfigs=" +
- Arrays.toString(xaRecoveryConfigs) +
- ", instance=" +
- System.identityHashCode(this) +
- "]";
- }
-
- /**
- * Close the connection
- */
- public void close()
- {
- ServerLocator oldServerLocator = null;
- ClientSessionFactory oldCSF = null;
- ClientSession oldDelegate = null;
- synchronized (ActiveMQXAResourceWrapper.lock)
- {
- oldCSF = csf;
- csf = null;
- oldDelegate = delegate;
- delegate = null;
- oldServerLocator = serverLocator;
- serverLocator = null;
- }
-
- if (oldDelegate != null)
- {
- try
- {
- oldDelegate.close();
- }
- catch (Throwable ignorable)
- {
- ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
- }
- }
-
- if (oldCSF != null)
- {
- try
- {
- oldCSF.close();
- }
- catch (Throwable ignorable)
- {
- ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
- }
- }
-
- if (oldServerLocator != null)
- {
- try
- {
- oldServerLocator.close();
- }
- catch (Throwable ignorable)
- {
- ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
- }
- }
- }
-
- /**
- * Check whether an XAException is fatal. If it is an RM problem
- * we close the connection so the next call will reconnect.
- *
- * @param e the xa exception
- * @return never
- * @throws XAException always
- */
- protected XAException check(final XAException e) throws XAException
- {
- ActiveMQJMSServerLogger.LOGGER.xaRecoveryError(e);
-
-
- // If any exception happened, we close the connection so we may start fresh
- close();
- throw e;
- }
-
- @Override
- protected void finalize() throws Throwable
- {
- close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
deleted file mode 100644
index d314c86..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClusterTopologyListener;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.core.client.SessionFailureListener;
-import org.apache.activemq.api.core.client.TopologyMember;
-import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * <p>This class will have a simple Connection Factory and will listen
- * for topology updates. </p>
- * <p>This Discovery is instantiated by {@link ActiveMQRecoveryRegistry}
- *
- * @author clebertsuconic
- */
-public class RecoveryDiscovery implements SessionFailureListener
-{
-
- private ServerLocator locator;
- private ClientSessionFactoryInternal sessionFactory;
- private final XARecoveryConfig config;
- private final AtomicInteger usage = new AtomicInteger(0);
- private boolean started = false;
-
-
- public RecoveryDiscovery(XARecoveryConfig config)
- {
- this.config = config;
- }
-
- public synchronized void start(boolean retry)
- {
- if (!started)
- {
- ActiveMQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
- started = true;
-
- locator = config.createServerLocator();
- locator.disableFinalizeCheck();
- locator.addClusterTopologyListener(new InternalListener(config));
- try
- {
- sessionFactory = (ClientSessionFactoryInternal) locator.createSessionFactory();
- // We are using the SessionFactoryInternal here directly as we don't have information to connect with an user and password
- // on the session as all we want here is to get the topology
- // in case of failure we will retry
- sessionFactory.addFailureListener(this);
-
- ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
- }
- catch (Exception startupError)
- {
- if (!retry)
- {
- ActiveMQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
- }
- stop();
- ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
- }
-
- }
- }
-
- public synchronized void stop()
- {
- internalStop();
- }
-
- /**
- * we may have several connection factories referencing the same connection recovery entry.
- * Because of that we need to make a count of the number of the instances that are referencing it,
- * so we will remove it as soon as we are done
- */
- public int incrementUsage()
- {
- return usage.decrementAndGet();
- }
-
- public int decrementUsage()
- {
- return usage.incrementAndGet();
- }
-
-
- @Override
- protected void finalize()
- {
- // I don't think it's a good thing to synchronize a method on a finalize,
- // hence the internalStop (no sync) call here
- internalStop();
- }
-
- protected void internalStop()
- {
- if (started)
- {
- started = false;
- try
- {
- if (sessionFactory != null)
- {
- sessionFactory.close();
- }
- }
- catch (Exception ignored)
- {
- ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
- }
-
- try
- {
- locator.close();
- }
- catch (Exception ignored)
- {
- ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
- }
-
- sessionFactory = null;
- locator = null;
- }
- }
-
-
- static final class InternalListener implements ClusterTopologyListener
- {
- private final XARecoveryConfig config;
-
- public InternalListener(final XARecoveryConfig config)
- {
- this.config = config;
- }
-
- @Override
- public void nodeUP(TopologyMember topologyMember, boolean last)
- {
- // There is a case where the backup announce itself,
- // we need to ignore a case where getLive is null
- if (topologyMember.getLive() != null)
- {
- Pair<TransportConfiguration, TransportConfiguration> connector =
- new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(),
- topologyMember.getBackup());
- ActiveMQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
- config.getUsername(), config.getPassword());
- }
- }
-
- @Override
- public void nodeDown(long eventUID, String nodeID)
- {
- // I'm not putting any node down, since it may have previous transactions hanging, however at some point we may
- //change it have some sort of timeout for removal
- }
-
- }
-
-
- @Override
- public void connectionFailed(ActiveMQException exception, boolean failedOver)
- {
- if (exception.getType() == ActiveMQExceptionType.DISCONNECTED)
- {
- ActiveMQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
- }
- else
- {
- ActiveMQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
- exception);
- }
- internalStop();
- ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
- }
-
- @Override
- public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
- {
- connectionFailed(me, failedOver);
- }
-
- @Override
- public void beforeReconnect(ActiveMQException exception)
- {
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- return "RecoveryDiscovery [config=" + config + ", started=" + started + "]";
- }
-
- @Override
- public int hashCode()
- {
- return config.hashCode();
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (o == null || (!(o instanceof RecoveryDiscovery)))
- {
- return false;
- }
- RecoveryDiscovery discovery = (RecoveryDiscovery) o;
-
- return config.equals(discovery.config);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
deleted file mode 100644
index cf23589..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.Arrays;
-
-import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-
-/**
- *
- * This represents the configuration of a single connection factory.
- *
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
- * @author Clebert Suconic
- *
- * A wrapper around info needed for the xa recovery resource
- * Date: 3/23/11
- * Time: 10:15 AM
- */
-public class XARecoveryConfig
-{
-
- private final boolean ha;
- private final TransportConfiguration[] transportConfiguration;
- private final DiscoveryGroupConfiguration discoveryConfiguration;
- private final String username;
- private final String password;
-
- public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
- String userName,
- String password)
- {
- if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
- {
- return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
- }
- else
- {
- return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
- }
-
- }
-
- public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
- {
- this.transportConfiguration = transportConfiguration;
- this.discoveryConfiguration = null;
- this.username = username;
- this.password = password;
- this.ha = ha;
- }
-
- public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
- {
- this.discoveryConfiguration = discoveryConfiguration;
- this.transportConfiguration = null;
- this.username = username;
- this.password = password;
- this.ha = ha;
- }
-
- public boolean isHA()
- {
- return ha;
- }
-
- public DiscoveryGroupConfiguration getDiscoveryConfiguration()
- {
- return discoveryConfiguration;
- }
-
- public TransportConfiguration[] getTransportConfig()
- {
- return transportConfiguration;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
-
-
- /**
- * Create a serverLocator using the configuration
- * @return locator
- */
- public ServerLocator createServerLocator()
- {
- if (getDiscoveryConfiguration() != null)
- {
- return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
- }
- else
- {
- return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
- }
-
- }
-
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
- result = prime * result + Arrays.hashCode(transportConfiguration);
- return result;
- }
-
- /*
- * We don't use username and password on purpose.
- * Just having the connector is enough, as we don't want to duplicate resources just because of usernames
- */
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- XARecoveryConfig other = (XARecoveryConfig)obj;
- if (discoveryConfiguration == null)
- {
- if (other.discoveryConfiguration != null)
- return false;
- }
- else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
- return false;
- if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
- return false;
- return true;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
- ", discoveryConfiguration = " + discoveryConfiguration +
- ", username=" +
- username +
- ", password=****]";
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
deleted file mode 100644
index 43746cf..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
index 0a3fa9f..7cd6a48 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
@@ -30,7 +30,7 @@ import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
/**
* ActiveMQ ManagedConnectionFactory
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
index 6747758..e76313a 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
@@ -44,12 +44,12 @@ import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.client.impl.ClientSessionInternal;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQDestination;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
import org.apache.activemq.ra.ActiveMQRABundle;
import org.apache.activemq.ra.ActiveMQRAConnectionFactory;
import org.apache.activemq.ra.ActiveMQRALogger;
import org.apache.activemq.ra.ActiveMQRaUtils;
import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.utils.FutureLatch;
import org.apache.activemq.utils.SensitiveDataCodec;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
index cf8864a..8f0754a 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
@@ -18,12 +18,14 @@ package org.apache.activemq.ra.recovery;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.ServiceLoader;
import java.util.Set;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
import org.apache.activemq.ra.ActiveMQRALogger;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.ConcurrentHashSet;
@@ -33,7 +35,7 @@ import org.apache.activemq.utils.ConcurrentHashSet;
*/
public final class RecoveryManager
{
- private ActiveMQRegistryBase registry;
+ private ActiveMQRegistry registry;
private static final String RESOURCE_RECOVERY_CLASS_NAMES = "org.jboss.as.messaging.jms.AS7RecoveryRegistry;"
+ "org.jboss.as.integration.activemq.recovery.AS5RecoveryRegistry";
@@ -97,7 +99,15 @@ public final class RecoveryManager
{
try
{
- registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
+ ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
+ if (sl.iterator().hasNext())
+ {
+ registry = sl.iterator().next();
+ }
+ else
+ {
+ registry = ActiveMQRegistryImpl.getInstance();
+ }
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/pom.xml b/activemq-service-extensions/pom.xml
index c0072b3..524cfdd 100644
--- a/activemq-service-extensions/pom.xml
+++ b/activemq-service-extensions/pom.xml
@@ -19,6 +19,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-jms-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
new file mode 100644
index 0000000..bb81b23
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.service.extensions.xa.recovery;
+
+/**
+ * @author mtaylor
+ */
+
+public interface ActiveMQRegistry
+{
+ void register(final XARecoveryConfig resourceConfig);
+
+ void unRegister(final XARecoveryConfig resourceConfig);
+
+ void stop();
+
+ void init();
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
new file mode 100644
index 0000000..2dd787b
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.service.extensions.xa.recovery;
+
+/**
+ * @author mtaylor
+ */
+
+public class ActiveMQRegistryImpl implements ActiveMQRegistry
+{
+ private static ActiveMQRegistryImpl instance;
+
+ public static ActiveMQRegistry getInstance()
+ {
+ if (instance == null)
+ {
+ instance = new ActiveMQRegistryImpl();
+ }
+ return instance;
+ }
+
+ @Override
+ public void register(XARecoveryConfig resourceConfig)
+ {
+
+ }
+
+ @Override
+ public void unRegister(XARecoveryConfig resourceConfig)
+ {
+
+ }
+
+ @Override
+ public void stop()
+ {
+
+ }
+
+ @Override
+ public void init()
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
new file mode 100644
index 0000000..140504a
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+import org.w3c.dom.Node;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ *
+ * Logger Code 12
+ *
+ * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so
+ *
+ * INF0 1
+ * WARN 2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 121000 to 121999
+ */
+@MessageLogger(projectCode = "AMQ")
+public interface ActiveMQXARecoveryLogger extends BasicLogger
+{
+ /**
+ * The default logger.
+ */
+ ActiveMQXARecoveryLogger LOGGER = Logger.getMessageLogger(ActiveMQXARecoveryLogger.class, ActiveMQXARecoveryLogger.class.getPackage().getName());
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 121003, value = "JMS Server Manager Running cached command for {0}" , format = Message.Format.MESSAGE_FORMAT)
+ void serverRunningCachedCommand(Runnable run);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 121004, value = "JMS Server Manager Caching command for {0} since the JMS Server is not active yet",
+ format = Message.Format.MESSAGE_FORMAT)
+ void serverCachingCommand(Object runnable);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void invalidHostForConnector(String name, String newHost);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122007, value = "Queue {0} does not exist on the topic {1}. It was deleted manually probably." , format = Message.Format.MESSAGE_FORMAT)
+ void noQueueOnTopic(String queueName, String name);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122008, value = "XA Recovery can not connect to any ActiveMQ server on recovery {0}" , format = Message.Format.MESSAGE_FORMAT)
+ void recoveryConnectFailed(String s);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122011, value = "error unbinding {0} from JNDI" , format = Message.Format.MESSAGE_FORMAT)
+ void jndiUnbindError(@Cause Exception e, String key);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122012, value = "JMS Server Manager error" , format = Message.Format.MESSAGE_FORMAT)
+ void jmsServerError(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122013, value = "Error in XA Recovery recover" , format = Message.Format.MESSAGE_FORMAT)
+ void xaRecoverError(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122014, value = "Notified of connection failure in xa recovery connectionFactory for provider {0} will attempt reconnect on next pass",
+ format = Message.Format.MESSAGE_FORMAT)
+ void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
+ format = Message.Format.MESSAGE_FORMAT)
+ void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
+ void xaRecoveryError(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122017, value = "Tried to correct invalid \"host\" value \"0.0.0.0\" for \"{0}\" connector, but received an exception.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void failedToCorrectHost(@Cause Exception e, String name);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
+ format = Message.Format.MESSAGE_FORMAT)
+ void xaRecoveryStartError(XARecoveryConfig e);
+
+ @LogMessage(level = Logger.Level.ERROR)
+ @Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
+ void jmsConfigMissingKey(Node e);
+
+ @LogMessage(level = Logger.Level.ERROR)
+ @Message(id = 124002, value = "Failed to start JMS deployer" , format = Message.Format.MESSAGE_FORMAT)
+ void jmsDeployerStartError(@Cause Exception e);
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
new file mode 100644
index 0000000..d9f4847
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+
+/**
+ * A XAResourceRecovery instance that can be used to recover any JMS provider.
+ * <p>
+ * In reality only recover, rollback and commit will be called but we still need to be implement all
+ * methods just in case.
+ * <p>
+ * To enable this add the following to the jbossts-properties file
+ * <pre>
+ * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1"
+ * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/>
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote
+ * <pre>
+ * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote and has failover configured
+ * <pre>
+ * <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ * value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>
+ * </pre>
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ */
+public class ActiveMQXAResourceRecovery
+{
+ private final boolean trace = ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled();
+
+ private boolean hasMore;
+
+ private ActiveMQXAResourceWrapper res;
+
+ public ActiveMQXAResourceRecovery()
+ {
+ if (trace)
+ {
+ ActiveMQXARecoveryLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
+ }
+ }
+
+ public boolean initialise(final String config)
+ {
+ if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+ {
+ ActiveMQXARecoveryLogger.LOGGER.trace(this + " intialise: " + config);
+ }
+
+ String[] configs = config.split(";");
+ XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+ for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+ {
+ String s = configs[i];
+ ConfigParser parser = new ConfigParser(s);
+ String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+ Map<String, Object> connectorParams = parser.getConnectorParameters();
+ String username = parser.getUsername();
+ String password = parser.getPassword();
+ TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+ xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
+ }
+
+
+ res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
+
+ if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+ {
+ ActiveMQXARecoveryLogger.LOGGER.trace(this + " initialised");
+ }
+
+ return true;
+ }
+
+ public boolean hasMoreResources()
+ {
+ if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+ {
+ ActiveMQXARecoveryLogger.LOGGER.trace(this + " hasMoreResources");
+ }
+
+ /*
+ * The way hasMoreResources is supposed to work is as follows:
+ * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
+ * true it will call getXAResource.
+ * It will repeat that until hasMoreResources returns false.
+ * Then the sweep is over.
+ * For the next sweep hasMoreResources should return true, etc.
+ *
+ * In our case where we only need to return one XAResource per sweep,
+ * hasMoreResources should basically alternate between true and false.
+ *
+ *
+ */
+
+ hasMore = !hasMore;
+
+ return hasMore;
+ }
+
+ public XAResource getXAResource()
+ {
+ if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+ {
+ ActiveMQXARecoveryLogger.LOGGER.trace(this + " getXAResource");
+ }
+
+ return res;
+ }
+
+ public XAResource[] getXAResources()
+ {
+ return new XAResource[]{res};
+ }
+
+ @Override
+ protected void finalize()
+ {
+ res.close();
+ }
+
+ public static class ConfigParser
+ {
+ private final String connectorFactoryClassName;
+
+ private final Map<String, Object> connectorParameters;
+
+ private String username;
+
+ private String password;
+
+ public ConfigParser(final String config)
+ {
+ if (config == null || config.length() == 0)
+ {
+ throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+ }
+
+ String[] strings = config.split(",");
+
+ // First (mandatory) param is the connector factory class name
+ if (strings.length < 1)
+ {
+ throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+ }
+
+ connectorFactoryClassName = strings[0].trim();
+
+ // Next two (optional) parameters are the username and password to use for creating the session for recovery
+
+ if (strings.length >= 2)
+ {
+
+ username = strings[1].trim();
+ if (username.length() == 0)
+ {
+ username = null;
+ }
+
+ if (strings.length == 2)
+ {
+ throw new IllegalArgumentException("If username is specified, password must be specified too");
+ }
+
+ password = strings[2].trim();
+ if (password.length() == 0)
+ {
+ password = null;
+ }
+ }
+
+ // other tokens are for connector configurations
+ connectorParameters = new HashMap<String, Object>();
+ if (strings.length >= 3)
+ {
+ for (int i = 3; i < strings.length; i++)
+ {
+ String[] str = strings[i].split("=");
+ if (str.length == 2)
+ {
+ connectorParameters.put(str[0].trim(), str[1].trim());
+ }
+ }
+ }
+ }
+
+ public String getConnectorFactoryClassName()
+ {
+ return connectorFactoryClassName;
+ }
+
+ public Map<String, Object> getConnectorParameters()
+ {
+ return connectorParameters;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+ }
+}