You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2014/12/10 13:52:47 UTC

[1/3] activemq-6 git commit: ActiveMQ6-6 Factor out WildFly XA Recovery

Repository: activemq-6
Updated Branches:
  refs/heads/master 09a6fee02 -> 2d69d9dc7


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java
new file mode 100644
index 0000000..b51cf24
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceWrapper.java
@@ -0,0 +1,534 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import java.util.Arrays;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQExceptionType;
+import org.apache.activemq.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.core.client.SessionFailureListener;
+
+/**
+ * XAResourceWrapper.
+ *
+ * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
+ *
+ * The reason why we don't use that class directly is that it assumes on failure of connection
+ * the RM_FAIL or RM_ERR is thrown, but in ActiveMQ we throw XA_RETRY since we want the recovery manager to be able
+ * to retry on failure without having to manually retry
+ *
+ * @author <a href="adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *
+ * @version $Revision: 45341 $
+ */
+public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureListener
+{
+   /** The state lock */
+   private static final Object lock = new Object();
+
+   private ServerLocator serverLocator;
+
+   private ClientSessionFactory csf;
+
+   private ClientSession delegate;
+
+   private XARecoveryConfig[] xaRecoveryConfigs;
+
+   public ActiveMQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
+   {
+      this.xaRecoveryConfigs = xaRecoveryConfigs;
+
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
+            ", instance=" +
+            System.identityHashCode(this));
+      }
+   }
+
+   public Xid[] recover(final int flag) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
+      }
+
+      try
+      {
+         Xid[] xids = xaResource.recover(flag);
+
+         if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
+         }
+
+         return xids;
+      }
+      catch (XAException e)
+      {
+         ActiveMQXARecoveryLogger.LOGGER.xaRecoverError(e);
+         throw check(e);
+      }
+   }
+
+   public void commit(final Xid xid, final boolean onePhase) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
+      }
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void rollback(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void forget(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("Forget " + xaResource + " xid ");
+      }
+
+      try
+      {
+         xaResource.forget(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      if (xaRes instanceof ActiveMQXAResourceWrapper)
+      {
+         xaRes = ((ActiveMQXAResourceWrapper)xaRes).getDelegate(false);
+      }
+
+      XAResource xaResource = getDelegate(false);
+      try
+      {
+         return xaResource.isSameRM(xaRes);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int prepare(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("prepare " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void start(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("start " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void end(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("end " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.getTransactionTimeout();
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean setTransactionTimeout(final int seconds) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.setTransactionTimeout(seconds);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void connectionFailed(final ActiveMQException me, boolean failedOver)
+   {
+      if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
+      {
+         if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug("being disconnected for server shutdown", me);
+         }
+      }
+      else
+      {
+         ActiveMQXARecoveryLogger.LOGGER.xaRecoverConnectionError(me, csf);
+      }
+      close();
+   }
+
+   @Override
+   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
+   {
+      connectionFailed(me, failedOver);
+   }
+
+   public void beforeReconnect(final ActiveMQException me)
+   {
+   }
+
+   /**
+    * Get the connectionFactory XAResource
+    *
+    * @return the connectionFactory
+    * @throws XAException for any problem
+    */
+   private XAResource getDelegate(boolean retry) throws XAException
+   {
+      XAResource result = null;
+      Exception error = null;
+      try
+      {
+         result = connect();
+      }
+      catch (Exception e)
+      {
+         error = e;
+      }
+
+      if (result == null)
+      {
+         // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+         // all chaos is let loose
+         if (retry)
+         {
+            XAException xae = new XAException("Connection unavailable for xa recovery");
+            xae.errorCode = XAException.XA_RETRY;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQXARecoveryLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+         else
+         {
+            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+            xae.errorCode = XAException.XAER_RMERR;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQXARecoveryLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+
+      }
+
+      return result;
+   }
+
+   /**
+    * Connect to the server if not already done so
+    *
+    * @return the connectionFactory XAResource
+    * @throws Exception for any problem
+    */
+   protected XAResource connect() throws Exception
+   {
+      // Do we already have a valid connectionFactory?
+      synchronized (ActiveMQXAResourceWrapper.lock)
+      {
+         if (delegate != null)
+         {
+            return delegate;
+         }
+      }
+
+      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
+      {
+
+         if (xaRecoveryConfig == null)
+         {
+            continue;
+         }
+         if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
+         }
+
+         ClientSession cs = null;
+
+         try
+         {
+            // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
+            // we really need to control what server it's connected to
+
+            // Manual configuration may still use discovery, so we will keep this
+            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+            {
+               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
+            }
+            else
+            {
+               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
+            }
+            serverLocator.disableFinalizeCheck();
+            csf = serverLocator.createSessionFactory();
+            if (xaRecoveryConfig.getUsername() == null)
+            {
+               cs = csf.createSession(true, false, false);
+            }
+            else
+            {
+               cs = csf.createSession(xaRecoveryConfig.getUsername(),
+                  xaRecoveryConfig.getPassword(),
+                  true,
+                  false,
+                  false,
+                  false,
+                  1);
+            }
+         }
+         catch (Throwable e)
+         {
+            ActiveMQXARecoveryLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
+            if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQXARecoveryLogger.LOGGER.debug(e.getMessage(), e);
+            }
+
+            try
+            {
+               if (cs != null) cs.close();
+               if (serverLocator != null) serverLocator.close();
+            }
+            catch (Throwable ignored)
+            {
+               if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+               {
+                  ActiveMQXARecoveryLogger.LOGGER.trace(e.getMessage(), ignored);
+               }
+            }
+            continue;
+         }
+
+         cs.addFailureListener(this);
+
+         synchronized (ActiveMQXAResourceWrapper.lock)
+         {
+            delegate = cs;
+         }
+
+         return delegate;
+      }
+      ActiveMQXARecoveryLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
+      throw new ActiveMQNotConnectedException();
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "ActiveMQXAResourceWrapper [serverLocator=" + serverLocator +
+         ", csf=" +
+         csf +
+         ", delegate=" +
+         delegate +
+         ", xaRecoveryConfigs=" +
+         Arrays.toString(xaRecoveryConfigs) +
+         ", instance=" +
+         System.identityHashCode(this) +
+         "]";
+   }
+
+   /**
+    * Close the connection
+    */
+   public void close()
+   {
+      ServerLocator oldServerLocator = null;
+      ClientSessionFactory oldCSF = null;
+      ClientSession oldDelegate = null;
+      synchronized (ActiveMQXAResourceWrapper.lock)
+      {
+         oldCSF = csf;
+         csf = null;
+         oldDelegate = delegate;
+         delegate = null;
+         oldServerLocator = serverLocator;
+         serverLocator = null;
+      }
+
+      if (oldDelegate != null)
+      {
+         try
+         {
+            oldDelegate.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldCSF != null)
+      {
+         try
+         {
+            oldCSF.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldServerLocator != null)
+      {
+         try
+         {
+            oldServerLocator.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+   }
+
+   /**
+    * Check whether an XAException is fatal. If it is an RM problem
+    * we close the connection so the next call will reconnect.
+    *
+    * @param e the xa exception
+    * @return never
+    * @throws XAException always
+    */
+   protected XAException check(final XAException e) throws XAException
+   {
+      ActiveMQXARecoveryLogger.LOGGER.xaRecoveryError(e);
+
+
+      // If any exception happened, we close the connection so we may start fresh
+      close();
+      throw e;
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/XARecoveryConfig.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/XARecoveryConfig.java
new file mode 100644
index 0000000..5225528
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/XARecoveryConfig.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import java.util.Arrays;
+
+import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+
+/**
+ *
+ * This represents the configuration of a single connection factory.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * @author Clebert Suconic
+ *
+ * A wrapper around info needed for the xa recovery resource
+ *         Date: 3/23/11
+ *         Time: 10:15 AM
+ */
+public class XARecoveryConfig
+{
+
+   private final boolean ha;
+   private final TransportConfiguration[] transportConfiguration;
+   private final DiscoveryGroupConfiguration discoveryConfiguration;
+   private final String username;
+   private final String password;
+
+   public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
+                                            String userName,
+                                            String password)
+   {
+      if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+      {
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+      }
+      else
+      {
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+      }
+
+   }
+
+   public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
+   {
+      this.transportConfiguration = transportConfiguration;
+      this.discoveryConfiguration = null;
+      this.username = username;
+      this.password = password;
+      this.ha = ha;
+   }
+
+   public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
+   {
+      this.discoveryConfiguration = discoveryConfiguration;
+      this.transportConfiguration = null;
+      this.username = username;
+      this.password = password;
+      this.ha = ha;
+   }
+
+   public boolean isHA()
+   {
+      return ha;
+   }
+
+   public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+   {
+      return discoveryConfiguration;
+   }
+
+   public TransportConfiguration[] getTransportConfig()
+   {
+      return transportConfiguration;
+   }
+
+   public String getUsername()
+   {
+      return username;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+
+   /**
+    * Create a serverLocator using the configuration
+    * @return locator
+    */
+   public ServerLocator createServerLocator()
+   {
+      if (getDiscoveryConfiguration() != null)
+      {
+         return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
+      }
+      else
+      {
+         return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
+      }
+
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+      result = prime * result + Arrays.hashCode(transportConfiguration);
+      return result;
+   }
+
+   /*
+    * We don't use username and password on purpose.
+    * Just having the connector is enough, as we don't want to duplicate resources just because of usernames
+    */
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      XARecoveryConfig other = (XARecoveryConfig)obj;
+      if (discoveryConfiguration == null)
+      {
+         if (other.discoveryConfiguration != null)
+            return false;
+      }
+      else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+         return false;
+      if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+         return false;
+      return true;
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+             ", discoveryConfiguration = " + discoveryConfiguration +
+             ", username=" +
+             username +
+             ", password=****]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/byteman-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/pom.xml b/tests/byteman-tests/pom.xml
index 02e9d1b..0388dc8 100644
--- a/tests/byteman-tests/pom.xml
+++ b/tests/byteman-tests/pom.xml
@@ -128,8 +128,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge -->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ResourceAdapterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ResourceAdapterTest.java
index d9f7c69..74f2ef1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ResourceAdapterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ResourceAdapterTest.java
@@ -22,13 +22,11 @@ import javax.resource.spi.endpoint.MessageEndpoint;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
 import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.client.ClientSessionFactory;
@@ -38,11 +36,10 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.jms.client.ActiveMQDestination;
-import org.apache.activemq.jms.server.recovery.RecoveryDiscovery;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
 import org.apache.activemq.ra.ActiveMQResourceAdapter;
 import org.apache.activemq.ra.inflow.ActiveMQActivation;
 import org.apache.activemq.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.tests.unit.ra.MessageEndpointFactory;
 import org.apache.activemq.tests.util.UnitTestCase;
 import org.apache.activemq.utils.DefaultSensitiveStringCodec;
@@ -707,22 +704,6 @@ public class ResourceAdapterTest extends ActiveMQRATestBase
       assertTrue(endpoint.released);
    }
 
-   @Test
-   public void testRecoveryDiscoveryAsKey() throws Exception
-   {
-      Set<RecoveryDiscovery> discoverySet = new HashSet<RecoveryDiscovery>();
-      String factClass = "org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory";
-      TransportConfiguration transportConfig = new TransportConfiguration(factClass, null, "netty");
-      XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
-                                                     null, null);
-
-      RecoveryDiscovery discovery1 = new RecoveryDiscovery(config);
-      RecoveryDiscovery discovery2 = new RecoveryDiscovery(config);
-      assertTrue(discoverySet.add(discovery1));
-      assertFalse(discoverySet.add(discovery2));
-      assertEquals("should have only one in the set", 1, discoverySet.size());
-   }
-
    @Override
    public boolean useSecurity()
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/joram-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/joram-tests/pom.xml b/tests/joram-tests/pom.xml
index b157500..fe78bbc 100644
--- a/tests/joram-tests/pom.xml
+++ b/tests/joram-tests/pom.xml
@@ -76,8 +76,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge -->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/performance-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/performance-tests/pom.xml b/tests/performance-tests/pom.xml
index ea82e65..224694f 100644
--- a/tests/performance-tests/pom.xml
+++ b/tests/performance-tests/pom.xml
@@ -90,8 +90,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge-->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/soak-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index b876dc1..7846310 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -109,8 +109,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge-->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/stress-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/stress-tests/pom.xml b/tests/stress-tests/pom.xml
index 9918a61..9d212c0 100644
--- a/tests/stress-tests/pom.xml
+++ b/tests/stress-tests/pom.xml
@@ -109,8 +109,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge-->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/tests/timing-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/timing-tests/pom.xml b/tests/timing-tests/pom.xml
index 3b1fb1a..b83d46a 100644
--- a/tests/timing-tests/pom.xml
+++ b/tests/timing-tests/pom.xml
@@ -87,8 +87,8 @@
       </dependency>
       <!--this specifically for the JMS Bridge-->
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
       </dependency>
       <dependency>
          <groupId>org.apache.geronimo.components</groupId>


[2/3] activemq-6 git commit: ActiveMQ6-6 Factor out WildFly XA Recovery

Posted by an...@apache.org.
ActiveMQ6-6 Factor out WildFly XA Recovery

Pulls out WildFly XA Recovery specifics into a different project.  Some
XA recovery code is still present and is used as integration points for
integrating TM XA recovery processes.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/8f91af1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/8f91af1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/8f91af1b

Branch: refs/heads/master
Commit: 8f91af1b5cdf6ce6b0e4572919a9b7653ef05da9
Parents: d7c7d86
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Dec 2 12:58:08 2014 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Dec 9 16:08:34 2014 +0000

----------------------------------------------------------------------
 activemq-jms-server/pom.xml                     |  11 +-
 .../activemq/jms/bridge/impl/JMSBridgeImpl.java |  18 +-
 .../jms/server/ActiveMQJMSServerLogger.java     |  11 -
 .../recovery/ActiveMQRecoveryRegistry.java      | 254 ---------
 .../server/recovery/ActiveMQRegistryBase.java   |  75 ---
 .../recovery/ActiveMQXAResourceRecovery.java    | 235 --------
 .../recovery/ActiveMQXAResourceWrapper.java     | 535 -------------------
 .../jms/server/recovery/RecoveryDiscovery.java  | 236 --------
 .../jms/server/recovery/XARecoveryConfig.java   | 171 ------
 .../jms/server/recovery/package-info.java       |  18 -
 .../ra/ActiveMQRAManagedConnectionFactory.java  |   2 +-
 .../activemq/ra/inflow/ActiveMQActivation.java  |   2 +-
 .../activemq/ra/recovery/RecoveryManager.java   |  18 +-
 activemq-service-extensions/pom.xml             |   5 +
 .../xa/recovery/ActiveMQRegistry.java           |  33 ++
 .../xa/recovery/ActiveMQRegistryImpl.java       |  60 +++
 .../xa/recovery/ActiveMQXARecoveryLogger.java   | 118 ++++
 .../xa/recovery/ActiveMQXAResourceRecovery.java | 233 ++++++++
 .../xa/recovery/ActiveMQXAResourceWrapper.java  | 534 ++++++++++++++++++
 .../xa/recovery/XARecoveryConfig.java           | 171 ++++++
 tests/byteman-tests/pom.xml                     |   4 +-
 .../integration/ra/ResourceAdapterTest.java     |  21 +-
 tests/joram-tests/pom.xml                       |   4 +-
 tests/performance-tests/pom.xml                 |   4 +-
 tests/soak-tests/pom.xml                        |   4 +-
 tests/stress-tests/pom.xml                      |   4 +-
 tests/timing-tests/pom.xml                      |   4 +-
 27 files changed, 1203 insertions(+), 1582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-server/pom.xml b/activemq-jms-server/pom.xml
index 73215ae..246dea3 100644
--- a/activemq-jms-server/pom.xml
+++ b/activemq-jms-server/pom.xml
@@ -50,13 +50,14 @@
          <artifactId>geronimo-ejb_3.0_spec</artifactId>
       </dependency>
       <dependency>
-         <groupId>org.jboss.jbossts.jts</groupId>
-         <artifactId>jbossjts-jacorb</artifactId>
-         <optional>true</optional>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
+         <version>1.1.1</version>
       </dependency>
       <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-service-extensions</artifactId>
+         <version>${project.version}</version>
       </dependency>
    </dependencies>
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
index 169bce8..fed9420 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
@@ -44,6 +44,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.ServiceLoader;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -63,9 +64,10 @@ import org.apache.activemq.jms.client.ActiveMQConnection;
 import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.jms.client.ActiveMQMessage;
 import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
-import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
 import org.apache.activemq.service.extensions.ServiceUtils;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.utils.ClassloadingUtil;
 import org.apache.activemq.utils.DefaultSensitiveStringCodec;
 import org.apache.activemq.utils.PasswordMaskingUtil;
@@ -183,7 +185,7 @@ public final class JMSBridgeImpl implements JMSBridge
 
    private static final int FORWARD_MODE_NONTX = 2;
 
-   private ActiveMQRegistryBase registry;
+   private ActiveMQRegistry registry;
 
    /*
     * Constructor for MBean
@@ -2228,7 +2230,15 @@ public final class JMSBridgeImpl implements JMSBridge
          {
             try
             {
-               registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
+               ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
+               if (sl.iterator().hasNext())
+               {
+                  registry = sl.iterator().next();
+               }
+               else
+               {
+                  registry = ActiveMQRegistryImpl.getInstance();
+               }
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
index 1fbd0b7..eb97abb 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/ActiveMQJMSServerLogger.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.jms.server;
 
 import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
 import org.jboss.logging.BasicLogger;
 import org.jboss.logging.Logger;
 import org.jboss.logging.annotations.Cause;
@@ -90,11 +89,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
             format = Message.Format.MESSAGE_FORMAT)
    void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
 
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
-            format = Message.Format.MESSAGE_FORMAT)
-   void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
-
    @LogMessage(level = Logger.Level.DEBUG)
    @Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
    void xaRecoveryError(@Cause Exception e);
@@ -104,11 +98,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
             format = Message.Format.MESSAGE_FORMAT)
    void failedToCorrectHost(@Cause Exception e, String name);
 
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
-            format = Message.Format.MESSAGE_FORMAT)
-   void xaRecoveryStartError(XARecoveryConfig e);
-
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
    void jmsConfigMissingKey(Node e);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
deleted file mode 100644
index f0fd2e7..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-import org.jboss.tm.XAResourceRecovery;
-
-/**
- * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
- * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link ActiveMQRecoveryRegistry#register(XARecoveryConfig)}</p>
- * <p>Later the {@link RecoveryDiscovery} will call {@link ActiveMQRecoveryRegistry#nodeUp(String, Pair, String, String)}
- * so we will keep a track of nodes on the cluster
- * or nodes where this server is connected to. </p>
- *
- * @author clebertsuconic
- */
-public class ActiveMQRecoveryRegistry implements XAResourceRecovery
-{
-
-   private static final ActiveMQRecoveryRegistry theInstance = new ActiveMQRecoveryRegistry();
-
-   private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
-
-   /**
-    * The list by server id and resource adapter wrapper, what will actually be calling recovery.
-    * This will be returned by getXAResources
-    */
-   private final ConcurrentHashMap<String, ActiveMQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, ActiveMQXAResourceWrapper>();
-
-   /**
-    * In case of failures, we retry on the next getXAResources
-    */
-   private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
-
-   private ActiveMQRecoveryRegistry()
-   {
-   }
-
-   /**
-    * This will be called periodically by the Transaction Manager
-    */
-   public XAResource[] getXAResources()
-   {
-      try
-      {
-         checkFailures();
-
-         ActiveMQXAResourceWrapper[] resourceArray = new ActiveMQXAResourceWrapper[recoveries.size()];
-         resourceArray = recoveries.values().toArray(resourceArray);
-
-         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
-            ActiveMQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
-            for (Map.Entry<String, ActiveMQXAResourceWrapper> entry : recoveries.entrySet())
-            {
-               ActiveMQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
-            }
-            ActiveMQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
-         }
-
-         return resourceArray;
-      }
-      catch (Throwable e)
-      {
-         ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
-         return new XAResource[]{};
-      }
-   }
-
-   public static ActiveMQRecoveryRegistry getInstance()
-   {
-      return theInstance;
-   }
-
-   /**
-    * This will be called by then resource adapters, to register a new discovery
-    *
-    * @param resourceConfig
-    */
-   public void register(final XARecoveryConfig resourceConfig)
-   {
-      RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
-      RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
-      if (discoveryRecord == null)
-      {
-         discoveryRecord = newInstance;
-         discoveryRecord.start(false);
-      }
-      // you could have a configuration shared with multiple MDBs or RAs
-      discoveryRecord.incrementUsage();
-   }
-
-   /**
-    * Reference counts and deactivate a configuration
-    * Notice: this won't remove the servers since a server may have previous XIDs
-    *
-    * @param resourceConfig
-    */
-   public void unRegister(final XARecoveryConfig resourceConfig)
-   {
-      RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
-      if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
-      {
-         discoveryRecord = configSet.remove(resourceConfig);
-         if (discoveryRecord != null)
-         {
-            discoveryRecord.stop();
-         }
-      }
-   }
-
-   /**
-    * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
-    * maybe we should.
-    */
-   public void stop()
-   {
-      for (RecoveryDiscovery recoveryDiscovery : configSet.values())
-      {
-         recoveryDiscovery.stop();
-      }
-      for (ActiveMQXAResourceWrapper activeMQXAResourceWrapper : recoveries.values())
-      {
-         activeMQXAResourceWrapper.close();
-      }
-      recoveries.clear();
-      configSet.clear();
-   }
-
-   /**
-    * in case of a failure the Discovery will register itslef to retry
-    *
-    * @param failedDiscovery
-    */
-   public void failedDiscovery(RecoveryDiscovery failedDiscovery)
-   {
-      ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
-      synchronized (failedDiscoverySet)
-      {
-         failedDiscoverySet.add(failedDiscovery);
-      }
-   }
-
-   /**
-    * @param nodeID
-    * @param networkConfiguration
-    * @param username
-    * @param password
-    */
-   public void nodeUp(String nodeID,
-                      Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
-                      String username,
-                      String password)
-   {
-
-      if (recoveries.get(nodeID) == null)
-      {
-         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
-         }
-         XARecoveryConfig config = new XARecoveryConfig(true,
-                                                        extractTransportConfiguration(networkConfiguration),
-                                                        username,
-                                                        password);
-
-         ActiveMQXAResourceWrapper wrapper = new ActiveMQXAResourceWrapper(config);
-         recoveries.putIfAbsent(nodeID, wrapper);
-      }
-   }
-
-   public void nodeDown(String nodeID)
-   {
-   }
-
-   /**
-    * this will go through the list of retries
-    */
-   private void checkFailures()
-   {
-      final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
-
-      // it will transfer all the discoveries to a new collection
-      synchronized (failedDiscoverySet)
-      {
-         failures.addAll(failedDiscoverySet);
-         failedDiscoverySet.clear();
-      }
-
-      if (failures.size() > 0)
-      {
-         // This shouldn't happen on a regular scenario, however when this retry happens this needs
-         // to be done on a new thread
-         Thread t = new Thread("ActiveMQ Recovery Discovery Reinitialization")
-         {
-            @Override
-            public void run()
-            {
-               for (RecoveryDiscovery discovery : failures)
-               {
-                  try
-                  {
-                     ActiveMQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
-                     discovery.start(true);
-                  }
-                  catch (Throwable e)
-                  {
-                     ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
-                  }
-               }
-            }
-         };
-
-         t.start();
-      }
-   }
-
-   /**
-    * @param networkConfiguration
-    * @return
-    */
-   private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
-   {
-      if (networkConfiguration.getB() != null)
-      {
-         return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
-      }
-      return new TransportConfiguration[]{networkConfiguration.getA()};
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
deleted file mode 100644
index 1ed0104..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.tm.XAResourceRecoveryRegistry;
-
-/**
- * This class is a base class for the integration layer where
- * This class is used on integration points and this is just a bridge to the real registry at
- * {@link ActiveMQRecoveryRegistry}
- *
- * @author Clebert
- *
- *
- */
-public abstract class ActiveMQRegistryBase
-{
-
-   private final AtomicBoolean started = new AtomicBoolean(false);
-
-   public ActiveMQRegistryBase()
-   {
-   }
-
-
-   public abstract XAResourceRecoveryRegistry getTMRegistry();
-
-   public void register(final XARecoveryConfig resourceConfig)
-   {
-      init();
-      ActiveMQRecoveryRegistry.getInstance().register(resourceConfig);
-   }
-
-
-
-   public void unRegister(final XARecoveryConfig resourceConfig)
-   {
-      init();
-      ActiveMQRecoveryRegistry.getInstance().unRegister(resourceConfig);
-   }
-
-   public void stop()
-   {
-      if (started.compareAndSet(true, false) && getTMRegistry() != null)
-      {
-         getTMRegistry().removeXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
-         ActiveMQRecoveryRegistry.getInstance().stop();
-      }
-   }
-
-   private void init()
-   {
-      if (started.compareAndSet(false, true) && getTMRegistry() != null)
-      {
-         getTMRegistry().addXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
deleted file mode 100644
index 4f85925..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- * <p>
- * In reality only recover, rollback and commit will be called but we still need to be implement all
- * methods just in case.
- * <p>
- * To enable this add the following to the jbossts-properties file
- * <pre>
- * &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1"
- *                 value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/&gt;
- * </pre>
- * <p>
- * you'll need something like this if the ActiveMQ Server is remote
- * <pre>
- *      &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
- *                  value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/&gt;
- * </pre>
- * <p>
- * you'll need something like this if the ActiveMQ Server is remote and has failover configured
- * <pre>
- *             &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
- *                       value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/&gt;
- * </pre>
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- */
-public class ActiveMQXAResourceRecovery implements XAResourceRecovery
-{
-   private final boolean trace = ActiveMQJMSServerLogger.LOGGER.isTraceEnabled();
-
-   private boolean hasMore;
-
-   private ActiveMQXAResourceWrapper res;
-
-   public ActiveMQXAResourceRecovery()
-   {
-      if (trace)
-      {
-         ActiveMQJMSServerLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
-      }
-   }
-
-   public boolean initialise(final String config)
-   {
-      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
-      }
-
-      String[] configs = config.split(";");
-      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
-      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
-      {
-         String s = configs[i];
-         ConfigParser parser = new ConfigParser(s);
-         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
-         Map<String, Object> connectorParams = parser.getConnectorParameters();
-         String username = parser.getUsername();
-         String password = parser.getPassword();
-         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
-         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
-      }
-
-
-      res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
-
-      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.trace(this + " initialised");
-      }
-
-      return true;
-   }
-
-   public boolean hasMoreResources()
-   {
-      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
-      }
-
-      /*
-       * The way hasMoreResources is supposed to work is as follows:
-       * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
-       * true it will call getXAResource.
-       * It will repeat that until hasMoreResources returns false.
-       * Then the sweep is over.
-       * For the next sweep hasMoreResources should return true, etc.
-       *
-       * In our case where we only need to return one XAResource per sweep,
-       * hasMoreResources should basically alternate between true and false.
-       *
-       *
-       */
-
-      hasMore = !hasMore;
-
-      return hasMore;
-   }
-
-   public XAResource getXAResource()
-   {
-      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.trace(this + " getXAResource");
-      }
-
-      return res;
-   }
-
-   public XAResource[] getXAResources()
-   {
-      return new XAResource[]{res};
-   }
-
-   @Override
-   protected void finalize()
-   {
-      res.close();
-   }
-
-   public static class ConfigParser
-   {
-      private final String connectorFactoryClassName;
-
-      private final Map<String, Object> connectorParameters;
-
-      private String username;
-
-      private String password;
-
-      public ConfigParser(final String config)
-      {
-         if (config == null || config.length() == 0)
-         {
-            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
-         }
-
-         String[] strings = config.split(",");
-
-         // First (mandatory) param is the connector factory class name
-         if (strings.length < 1)
-         {
-            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
-         }
-
-         connectorFactoryClassName = strings[0].trim();
-
-         // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
-         if (strings.length >= 2)
-         {
-
-            username = strings[1].trim();
-            if (username.length() == 0)
-            {
-               username = null;
-            }
-
-            if (strings.length == 2)
-            {
-               throw new IllegalArgumentException("If username is specified, password must be specified too");
-            }
-
-            password = strings[2].trim();
-            if (password.length() == 0)
-            {
-               password = null;
-            }
-         }
-
-         // other tokens are for connector configurations
-         connectorParameters = new HashMap<String, Object>();
-         if (strings.length >= 3)
-         {
-            for (int i = 3; i < strings.length; i++)
-            {
-               String[] str = strings[i].split("=");
-               if (str.length == 2)
-               {
-                  connectorParameters.put(str[0].trim(), str[1].trim());
-               }
-            }
-         }
-      }
-
-      public String getConnectorFactoryClassName()
-      {
-         return connectorFactoryClassName;
-      }
-
-      public Map<String, Object> getConnectorParameters()
-      {
-         return connectorParameters;
-      }
-
-      public String getUsername()
-      {
-         return username;
-      }
-
-      public String getPassword()
-      {
-         return password;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
deleted file mode 100644
index a7841ba..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.Arrays;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.ActiveMQNotConnectedException;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.core.client.SessionFailureListener;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * XAResourceWrapper.
- *
- * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
- *
- * The reason why we don't use that class directly is that it assumes on failure of connection
- * the RM_FAIL or RM_ERR is thrown, but in ActiveMQ we throw XA_RETRY since we want the recovery manager to be able
- * to retry on failure without having to manually retry
- *
- * @author <a href="adrian@jboss.com">Adrian Brock</a>
- * @author <a href="tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *
- * @version $Revision: 45341 $
- */
-public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureListener
-{
-   /** The state lock */
-   private static final Object lock = new Object();
-
-   private ServerLocator serverLocator;
-
-   private ClientSessionFactory csf;
-
-   private ClientSession delegate;
-
-   private XARecoveryConfig[] xaRecoveryConfigs;
-
-   public ActiveMQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
-   {
-      this.xaRecoveryConfigs = xaRecoveryConfigs;
-
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
-            ", instance=" +
-            System.identityHashCode(this));
-      }
-   }
-
-   public Xid[] recover(final int flag) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
-      }
-
-      try
-      {
-         Xid[] xids = xaResource.recover(flag);
-
-         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
-         }
-
-         return xids;
-      }
-      catch (XAException e)
-      {
-         ActiveMQJMSServerLogger.LOGGER.xaRecoverError(e);
-         throw check(e);
-      }
-   }
-
-   public void commit(final Xid xid, final boolean onePhase) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
-      }
-      try
-      {
-         xaResource.commit(xid, onePhase);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void rollback(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.rollback(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void forget(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
-      }
-
-      try
-      {
-         xaResource.forget(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public boolean isSameRM(XAResource xaRes) throws XAException
-   {
-      if (xaRes instanceof ActiveMQXAResourceWrapper)
-      {
-         xaRes = ((ActiveMQXAResourceWrapper)xaRes).getDelegate(false);
-      }
-
-      XAResource xaResource = getDelegate(false);
-      try
-      {
-         return xaResource.isSameRM(xaRes);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public int prepare(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.prepare(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void start(final Xid xid, final int flags) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.start(xid, flags);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void end(final Xid xid, final int flags) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.end(xid, flags);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public int getTransactionTimeout() throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.getTransactionTimeout();
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public boolean setTransactionTimeout(final int seconds) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.setTransactionTimeout(seconds);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void connectionFailed(final ActiveMQException me, boolean failedOver)
-   {
-      if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
-      {
-         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
-         }
-      }
-      else
-      {
-         ActiveMQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
-      }
-      close();
-   }
-
-   @Override
-   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
-   {
-      connectionFailed(me, failedOver);
-   }
-
-   public void beforeReconnect(final ActiveMQException me)
-   {
-   }
-
-   /**
-    * Get the connectionFactory XAResource
-    *
-    * @return the connectionFactory
-    * @throws XAException for any problem
-    */
-   private XAResource getDelegate(boolean retry) throws XAException
-   {
-      XAResource result = null;
-      Exception error = null;
-      try
-      {
-         result = connect();
-      }
-      catch (Exception e)
-      {
-         error = e;
-      }
-
-      if (result == null)
-      {
-         // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
-         // all chaos is let loose
-         if (retry)
-         {
-            XAException xae = new XAException("Connection unavailable for xa recovery");
-            xae.errorCode = XAException.XA_RETRY;
-            if (error != null)
-            {
-               xae.initCause(error);
-            }
-            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
-            }
-            throw xae;
-         }
-         else
-         {
-            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
-            xae.errorCode = XAException.XAER_RMERR;
-            if (error != null)
-            {
-               xae.initCause(error);
-            }
-            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
-            }
-            throw xae;
-         }
-
-      }
-
-      return result;
-   }
-
-   /**
-    * Connect to the server if not already done so
-    *
-    * @return the connectionFactory XAResource
-    * @throws Exception for any problem
-    */
-   protected XAResource connect() throws Exception
-   {
-      // Do we already have a valid connectionFactory?
-      synchronized (ActiveMQXAResourceWrapper.lock)
-      {
-         if (delegate != null)
-         {
-            return delegate;
-         }
-      }
-
-      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
-      {
-
-         if (xaRecoveryConfig == null)
-         {
-            continue;
-         }
-         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
-         }
-
-         ClientSession cs = null;
-
-         try
-         {
-            // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
-            // we really need to control what server it's connected to
-
-            // Manual configuration may still use discovery, so we will keep this
-            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
-            {
-               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
-            }
-            else
-            {
-               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
-            }
-            serverLocator.disableFinalizeCheck();
-            csf = serverLocator.createSessionFactory();
-            if (xaRecoveryConfig.getUsername() == null)
-            {
-               cs = csf.createSession(true, false, false);
-            }
-            else
-            {
-               cs = csf.createSession(xaRecoveryConfig.getUsername(),
-                  xaRecoveryConfig.getPassword(),
-                  true,
-                  false,
-                  false,
-                  false,
-                  1);
-            }
-         }
-         catch (Throwable e)
-         {
-            ActiveMQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
-            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               ActiveMQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
-            }
-
-            try
-            {
-               if (cs != null) cs.close();
-               if (serverLocator != null) serverLocator.close();
-            }
-            catch (Throwable ignored)
-            {
-               if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
-               {
-                  ActiveMQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
-               }
-            }
-            continue;
-         }
-
-         cs.addFailureListener(this);
-
-         synchronized (ActiveMQXAResourceWrapper.lock)
-         {
-            delegate = cs;
-         }
-
-         return delegate;
-      }
-      ActiveMQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
-      throw new ActiveMQNotConnectedException();
-   }
-
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
-   @Override
-   public String toString()
-   {
-      return "ActiveMQXAResourceWrapper [serverLocator=" + serverLocator +
-         ", csf=" +
-         csf +
-         ", delegate=" +
-         delegate +
-         ", xaRecoveryConfigs=" +
-         Arrays.toString(xaRecoveryConfigs) +
-         ", instance=" +
-         System.identityHashCode(this) +
-         "]";
-   }
-
-   /**
-    * Close the connection
-    */
-   public void close()
-   {
-      ServerLocator oldServerLocator = null;
-      ClientSessionFactory oldCSF = null;
-      ClientSession oldDelegate = null;
-      synchronized (ActiveMQXAResourceWrapper.lock)
-      {
-         oldCSF = csf;
-         csf = null;
-         oldDelegate = delegate;
-         delegate = null;
-         oldServerLocator = serverLocator;
-         serverLocator = null;
-      }
-
-      if (oldDelegate != null)
-      {
-         try
-         {
-            oldDelegate.close();
-         }
-         catch (Throwable ignorable)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-
-      if (oldCSF != null)
-      {
-         try
-         {
-            oldCSF.close();
-         }
-         catch (Throwable ignorable)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-
-      if (oldServerLocator != null)
-      {
-         try
-         {
-            oldServerLocator.close();
-         }
-         catch (Throwable ignorable)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-   }
-
-   /**
-    * Check whether an XAException is fatal. If it is an RM problem
-    * we close the connection so the next call will reconnect.
-    *
-    * @param e the xa exception
-    * @return never
-    * @throws XAException always
-    */
-   protected XAException check(final XAException e) throws XAException
-   {
-      ActiveMQJMSServerLogger.LOGGER.xaRecoveryError(e);
-
-
-      // If any exception happened, we close the connection so we may start fresh
-      close();
-      throw e;
-   }
-
-   @Override
-   protected void finalize() throws Throwable
-   {
-      close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
deleted file mode 100644
index d314c86..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClusterTopologyListener;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.core.client.SessionFailureListener;
-import org.apache.activemq.api.core.client.TopologyMember;
-import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
-
-/**
- * <p>This class will have a simple Connection Factory and will listen
- * for topology updates. </p>
- * <p>This Discovery is instantiated by {@link ActiveMQRecoveryRegistry}
- *
- * @author clebertsuconic
- */
-public class RecoveryDiscovery implements SessionFailureListener
-{
-
-   private ServerLocator locator;
-   private ClientSessionFactoryInternal sessionFactory;
-   private final XARecoveryConfig config;
-   private final AtomicInteger usage = new AtomicInteger(0);
-   private boolean started = false;
-
-
-   public RecoveryDiscovery(XARecoveryConfig config)
-   {
-      this.config = config;
-   }
-
-   public synchronized void start(boolean retry)
-   {
-      if (!started)
-      {
-         ActiveMQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
-         started = true;
-
-         locator = config.createServerLocator();
-         locator.disableFinalizeCheck();
-         locator.addClusterTopologyListener(new InternalListener(config));
-         try
-         {
-            sessionFactory = (ClientSessionFactoryInternal) locator.createSessionFactory();
-            // We are using the SessionFactoryInternal here directly as we don't have information to connect with an user and password
-            // on the session as all we want here is to get the topology
-            // in case of failure we will retry
-            sessionFactory.addFailureListener(this);
-
-            ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
-         }
-         catch (Exception startupError)
-         {
-            if (!retry)
-            {
-               ActiveMQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
-            }
-            stop();
-            ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
-         }
-
-      }
-   }
-
-   public synchronized void stop()
-   {
-      internalStop();
-   }
-
-   /**
-    * we may have several connection factories referencing the same connection recovery entry.
-    * Because of that we need to make a count of the number of the instances that are referencing it,
-    * so we will remove it as soon as we are done
-    */
-   public int incrementUsage()
-   {
-      return usage.decrementAndGet();
-   }
-
-   public int decrementUsage()
-   {
-      return usage.incrementAndGet();
-   }
-
-
-   @Override
-   protected void finalize()
-   {
-      // I don't think it's a good thing to synchronize a method on a finalize,
-      // hence the internalStop (no sync) call here
-      internalStop();
-   }
-
-   protected void internalStop()
-   {
-      if (started)
-      {
-         started = false;
-         try
-         {
-            if (sessionFactory != null)
-            {
-               sessionFactory.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
-         }
-
-         try
-         {
-            locator.close();
-         }
-         catch (Exception ignored)
-         {
-            ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
-         }
-
-         sessionFactory = null;
-         locator = null;
-      }
-   }
-
-
-   static final class InternalListener implements ClusterTopologyListener
-   {
-      private final XARecoveryConfig config;
-
-      public InternalListener(final XARecoveryConfig config)
-      {
-         this.config = config;
-      }
-
-      @Override
-      public void nodeUP(TopologyMember topologyMember, boolean last)
-      {
-         // There is a case where the backup announce itself,
-         // we need to ignore a case where getLive is null
-         if (topologyMember.getLive() != null)
-         {
-            Pair<TransportConfiguration, TransportConfiguration> connector =
-               new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(),
-                                                                        topologyMember.getBackup());
-            ActiveMQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
-                                                         config.getUsername(), config.getPassword());
-         }
-      }
-
-      @Override
-      public void nodeDown(long eventUID, String nodeID)
-      {
-         // I'm not putting any node down, since it may have previous transactions hanging, however at some point we may
-         //change it have some sort of timeout for removal
-      }
-
-   }
-
-
-   @Override
-   public void connectionFailed(ActiveMQException exception, boolean failedOver)
-   {
-      if (exception.getType() == ActiveMQExceptionType.DISCONNECTED)
-      {
-         ActiveMQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
-      }
-      else
-      {
-         ActiveMQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
-                                            exception);
-      }
-      internalStop();
-      ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
-   }
-
-   @Override
-   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
-   {
-      connectionFailed(me, failedOver);
-   }
-
-   @Override
-   public void beforeReconnect(ActiveMQException exception)
-   {
-   }
-
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
-   @Override
-   public String toString()
-   {
-      return "RecoveryDiscovery [config=" + config + ", started=" + started + "]";
-   }
-
-   @Override
-   public int hashCode()
-   {
-      return config.hashCode();
-   }
-
-   @Override
-   public boolean equals(Object o)
-   {
-      if (o == null || (!(o instanceof RecoveryDiscovery)))
-      {
-         return false;
-      }
-      RecoveryDiscovery discovery = (RecoveryDiscovery) o;
-
-      return config.equals(discovery.config);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
deleted file mode 100644
index cf23589..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.Arrays;
-
-import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ActiveMQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-
-/**
- *
- * This represents the configuration of a single connection factory.
- *
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
- * @author Clebert Suconic
- *
- * A wrapper around info needed for the xa recovery resource
- *         Date: 3/23/11
- *         Time: 10:15 AM
- */
-public class XARecoveryConfig
-{
-
-   private final boolean ha;
-   private final TransportConfiguration[] transportConfiguration;
-   private final DiscoveryGroupConfiguration discoveryConfiguration;
-   private final String username;
-   private final String password;
-
-   public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
-                                            String userName,
-                                            String password)
-   {
-      if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
-      {
-         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
-      }
-      else
-      {
-         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
-      }
-
-   }
-
-   public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
-   {
-      this.transportConfiguration = transportConfiguration;
-      this.discoveryConfiguration = null;
-      this.username = username;
-      this.password = password;
-      this.ha = ha;
-   }
-
-   public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
-   {
-      this.discoveryConfiguration = discoveryConfiguration;
-      this.transportConfiguration = null;
-      this.username = username;
-      this.password = password;
-      this.ha = ha;
-   }
-
-   public boolean isHA()
-   {
-      return ha;
-   }
-
-   public DiscoveryGroupConfiguration getDiscoveryConfiguration()
-   {
-      return discoveryConfiguration;
-   }
-
-   public TransportConfiguration[] getTransportConfig()
-   {
-      return transportConfiguration;
-   }
-
-   public String getUsername()
-   {
-      return username;
-   }
-
-   public String getPassword()
-   {
-      return password;
-   }
-
-
-   /**
-    * Create a serverLocator using the configuration
-    * @return locator
-    */
-   public ServerLocator createServerLocator()
-   {
-      if (getDiscoveryConfiguration() != null)
-      {
-         return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
-      }
-      else
-      {
-         return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
-      }
-
-   }
-
-   @Override
-   public int hashCode()
-   {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
-      result = prime * result + Arrays.hashCode(transportConfiguration);
-      return result;
-   }
-
-   /*
-    * We don't use username and password on purpose.
-    * Just having the connector is enough, as we don't want to duplicate resources just because of usernames
-    */
-   @Override
-   public boolean equals(Object obj)
-   {
-      if (this == obj)
-         return true;
-      if (obj == null)
-         return false;
-      if (getClass() != obj.getClass())
-         return false;
-      XARecoveryConfig other = (XARecoveryConfig)obj;
-      if (discoveryConfiguration == null)
-      {
-         if (other.discoveryConfiguration != null)
-            return false;
-      }
-      else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
-         return false;
-      if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
-         return false;
-      return true;
-   }
-
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
-   @Override
-   public String toString()
-   {
-      return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
-             ", discoveryConfiguration = " + discoveryConfiguration +
-             ", username=" +
-             username +
-             ", password=****]";
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
deleted file mode 100644
index 43746cf..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
index 0a3fa9f..7cd6a48 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAManagedConnectionFactory.java
@@ -30,7 +30,7 @@ import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 
 /**
  * ActiveMQ ManagedConnectionFactory

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
index 6747758..e76313a 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
@@ -44,12 +44,12 @@ import org.apache.activemq.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.jms.client.ActiveMQDestination;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
 import org.apache.activemq.ra.ActiveMQRABundle;
 import org.apache.activemq.ra.ActiveMQRAConnectionFactory;
 import org.apache.activemq.ra.ActiveMQRALogger;
 import org.apache.activemq.ra.ActiveMQRaUtils;
 import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.utils.FutureLatch;
 import org.apache.activemq.utils.SensitiveDataCodec;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
index cf8864a..8f0754a 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/recovery/RecoveryManager.java
@@ -18,12 +18,14 @@ package org.apache.activemq.ra.recovery;
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.ServiceLoader;
 import java.util.Set;
 
 import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
 import org.apache.activemq.ra.ActiveMQRALogger;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
+import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
+import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.utils.ClassloadingUtil;
 import org.apache.activemq.utils.ConcurrentHashSet;
 
@@ -33,7 +35,7 @@ import org.apache.activemq.utils.ConcurrentHashSet;
  */
 public final class RecoveryManager
 {
-   private ActiveMQRegistryBase registry;
+   private ActiveMQRegistry registry;
 
    private static final String RESOURCE_RECOVERY_CLASS_NAMES = "org.jboss.as.messaging.jms.AS7RecoveryRegistry;"
             + "org.jboss.as.integration.activemq.recovery.AS5RecoveryRegistry";
@@ -97,7 +99,15 @@ public final class RecoveryManager
       {
          try
          {
-            registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
+            ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
+            if (sl.iterator().hasNext())
+            {
+               registry = sl.iterator().next();
+            }
+            else
+            {
+               registry = ActiveMQRegistryImpl.getInstance();
+            }
          }
          catch (Throwable e)
          {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/pom.xml b/activemq-service-extensions/pom.xml
index c0072b3..524cfdd 100644
--- a/activemq-service-extensions/pom.xml
+++ b/activemq-service-extensions/pom.xml
@@ -19,6 +19,11 @@
          <version>${project.version}</version>
       </dependency>
       <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
          <groupId>org.jboss.logging</groupId>
          <artifactId>jboss-logging</artifactId>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
new file mode 100644
index 0000000..bb81b23
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistry.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.service.extensions.xa.recovery;
+
+/**
+ * @author mtaylor
+ */
+
+public interface ActiveMQRegistry
+{
+   void register(final XARecoveryConfig resourceConfig);
+
+   void unRegister(final XARecoveryConfig resourceConfig);
+
+   void stop();
+
+   void init();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
new file mode 100644
index 0000000..2dd787b
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQRegistryImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.service.extensions.xa.recovery;
+
+/**
+ * @author mtaylor
+ */
+
+public class ActiveMQRegistryImpl implements ActiveMQRegistry
+{
+   private static ActiveMQRegistryImpl instance;
+
+   public static ActiveMQRegistry getInstance()
+   {
+      if (instance == null)
+      {
+         instance = new ActiveMQRegistryImpl();
+      }
+      return instance;
+   }
+
+   @Override
+   public void register(XARecoveryConfig resourceConfig)
+   {
+
+   }
+
+   @Override
+   public void unRegister(XARecoveryConfig resourceConfig)
+   {
+
+   }
+
+   @Override
+   public void stop()
+   {
+
+   }
+
+   @Override
+   public void init()
+   {
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
new file mode 100644
index 0000000..140504a
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+import org.w3c.dom.Node;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ *
+ * Logger Code 12
+ *
+ * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so
+ *
+ * INF0  1
+ * WARN  2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 121000 to 121999
+ */
+@MessageLogger(projectCode = "AMQ")
+public interface ActiveMQXARecoveryLogger extends BasicLogger
+{
+   /**
+    * The default logger.
+    */
+   ActiveMQXARecoveryLogger LOGGER = Logger.getMessageLogger(ActiveMQXARecoveryLogger.class, ActiveMQXARecoveryLogger.class.getPackage().getName());
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 121003, value = "JMS Server Manager Running cached command for {0}" , format = Message.Format.MESSAGE_FORMAT)
+   void serverRunningCachedCommand(Runnable run);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 121004, value = "JMS Server Manager Caching command for {0} since the JMS Server is not active yet",
+            format = Message.Format.MESSAGE_FORMAT)
+   void serverCachingCommand(Object runnable);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
+            format = Message.Format.MESSAGE_FORMAT)
+   void invalidHostForConnector(String name, String newHost);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122007, value = "Queue {0} does not exist on the topic {1}. It was deleted manually probably." , format = Message.Format.MESSAGE_FORMAT)
+   void noQueueOnTopic(String queueName, String name);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122008, value = "XA Recovery can not connect to any ActiveMQ server on recovery {0}"  , format = Message.Format.MESSAGE_FORMAT)
+   void recoveryConnectFailed(String s);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122011, value = "error unbinding {0} from JNDI" , format = Message.Format.MESSAGE_FORMAT)
+   void jndiUnbindError(@Cause Exception e, String key);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122012, value = "JMS Server Manager error" , format = Message.Format.MESSAGE_FORMAT)
+   void jmsServerError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122013, value = "Error in XA Recovery recover" , format = Message.Format.MESSAGE_FORMAT)
+   void xaRecoverError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122014, value = "Notified of connection failure in xa recovery connectionFactory for provider {0} will attempt reconnect on next pass",
+            format = Message.Format.MESSAGE_FORMAT)
+   void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
+            format = Message.Format.MESSAGE_FORMAT)
+   void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
+
+   @LogMessage(level = Logger.Level.DEBUG)
+   @Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
+   void xaRecoveryError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122017, value = "Tried to correct invalid \"host\" value \"0.0.0.0\" for \"{0}\" connector, but received an exception.",
+            format = Message.Format.MESSAGE_FORMAT)
+   void failedToCorrectHost(@Cause Exception e, String name);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
+            format = Message.Format.MESSAGE_FORMAT)
+   void xaRecoveryStartError(XARecoveryConfig e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
+   void jmsConfigMissingKey(Node e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 124002, value = "Failed to start JMS deployer" , format = Message.Format.MESSAGE_FORMAT)
+   void jmsDeployerStartError(@Cause Exception e);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/8f91af1b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
new file mode 100644
index 0000000..d9f4847
--- /dev/null
+++ b/activemq-service-extensions/src/main/java/org/apache/activemq/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.service.extensions.xa.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+
+/**
+ * A XAResourceRecovery instance that can be used to recover any JMS provider.
+ * <p>
+ * In reality only recover, rollback and commit will be called but we still need to be implement all
+ * methods just in case.
+ * <p>
+ * To enable this add the following to the jbossts-properties file
+ * <pre>
+ * &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1"
+ *                 value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote
+ * <pre>
+ *      &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ *                  value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote and has failover configured
+ * <pre>
+ *             &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ *                       value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/&gt;
+ * </pre>
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ */
+public class ActiveMQXAResourceRecovery
+{
+   private final boolean trace = ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled();
+
+   private boolean hasMore;
+
+   private ActiveMQXAResourceWrapper res;
+
+   public ActiveMQXAResourceRecovery()
+   {
+      if (trace)
+      {
+         ActiveMQXARecoveryLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
+      }
+   }
+
+   public boolean initialise(final String config)
+   {
+      if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.trace(this + " intialise: " + config);
+      }
+
+      String[] configs = config.split(";");
+      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+      {
+         String s = configs[i];
+         ConfigParser parser = new ConfigParser(s);
+         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+         Map<String, Object> connectorParams = parser.getConnectorParameters();
+         String username = parser.getUsername();
+         String password = parser.getPassword();
+         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
+      }
+
+
+      res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
+
+      if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.trace(this + " initialised");
+      }
+
+      return true;
+   }
+
+   public boolean hasMoreResources()
+   {
+      if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.trace(this + " hasMoreResources");
+      }
+
+      /*
+       * The way hasMoreResources is supposed to work is as follows:
+       * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
+       * true it will call getXAResource.
+       * It will repeat that until hasMoreResources returns false.
+       * Then the sweep is over.
+       * For the next sweep hasMoreResources should return true, etc.
+       *
+       * In our case where we only need to return one XAResource per sweep,
+       * hasMoreResources should basically alternate between true and false.
+       *
+       *
+       */
+
+      hasMore = !hasMore;
+
+      return hasMore;
+   }
+
+   public XAResource getXAResource()
+   {
+      if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQXARecoveryLogger.LOGGER.trace(this + " getXAResource");
+      }
+
+      return res;
+   }
+
+   public XAResource[] getXAResources()
+   {
+      return new XAResource[]{res};
+   }
+
+   @Override
+   protected void finalize()
+   {
+      res.close();
+   }
+
+   public static class ConfigParser
+   {
+      private final String connectorFactoryClassName;
+
+      private final Map<String, Object> connectorParameters;
+
+      private String username;
+
+      private String password;
+
+      public ConfigParser(final String config)
+      {
+         if (config == null || config.length() == 0)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         String[] strings = config.split(",");
+
+         // First (mandatory) param is the connector factory class name
+         if (strings.length < 1)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         connectorFactoryClassName = strings[0].trim();
+
+         // Next two (optional) parameters are the username and password to use for creating the session for recovery
+
+         if (strings.length >= 2)
+         {
+
+            username = strings[1].trim();
+            if (username.length() == 0)
+            {
+               username = null;
+            }
+
+            if (strings.length == 2)
+            {
+               throw new IllegalArgumentException("If username is specified, password must be specified too");
+            }
+
+            password = strings[2].trim();
+            if (password.length() == 0)
+            {
+               password = null;
+            }
+         }
+
+         // other tokens are for connector configurations
+         connectorParameters = new HashMap<String, Object>();
+         if (strings.length >= 3)
+         {
+            for (int i = 3; i < strings.length; i++)
+            {
+               String[] str = strings[i].split("=");
+               if (str.length == 2)
+               {
+                  connectorParameters.put(str[0].trim(), str[1].trim());
+               }
+            }
+         }
+      }
+
+      public String getConnectorFactoryClassName()
+      {
+         return connectorFactoryClassName;
+      }
+
+      public Map<String, Object> getConnectorParameters()
+      {
+         return connectorParameters;
+      }
+
+      public String getUsername()
+      {
+         return username;
+      }
+
+      public String getPassword()
+      {
+         return password;
+      }
+   }
+}


[3/3] activemq-6 git commit: merge #32 - wildfly recovery

Posted by an...@apache.org.
merge #32 - wildfly recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/2d69d9dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/2d69d9dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/2d69d9dc

Branch: refs/heads/master
Commit: 2d69d9dc7c4e994e3ee54045bb582ce216c6a0c2
Parents: 09a6fee 8f91af1
Author: Andy Taylor <an...@gmail.com>
Authored: Wed Dec 10 12:50:26 2014 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Wed Dec 10 12:50:26 2014 +0000

----------------------------------------------------------------------
 activemq-jms-server/pom.xml                     |  11 +-
 .../activemq/jms/bridge/impl/JMSBridgeImpl.java |  18 +-
 .../jms/server/ActiveMQJMSServerLogger.java     |  11 -
 .../recovery/ActiveMQRecoveryRegistry.java      | 254 ---------
 .../server/recovery/ActiveMQRegistryBase.java   |  75 ---
 .../recovery/ActiveMQXAResourceRecovery.java    | 235 --------
 .../recovery/ActiveMQXAResourceWrapper.java     | 535 -------------------
 .../jms/server/recovery/RecoveryDiscovery.java  | 236 --------
 .../jms/server/recovery/XARecoveryConfig.java   | 171 ------
 .../jms/server/recovery/package-info.java       |  18 -
 .../ra/ActiveMQRAManagedConnectionFactory.java  |   2 +-
 .../activemq/ra/inflow/ActiveMQActivation.java  |   2 +-
 .../activemq/ra/recovery/RecoveryManager.java   |  18 +-
 activemq-service-extensions/pom.xml             |   5 +
 .../xa/recovery/ActiveMQRegistry.java           |  33 ++
 .../xa/recovery/ActiveMQRegistryImpl.java       |  60 +++
 .../xa/recovery/ActiveMQXARecoveryLogger.java   | 118 ++++
 .../xa/recovery/ActiveMQXAResourceRecovery.java | 233 ++++++++
 .../xa/recovery/ActiveMQXAResourceWrapper.java  | 534 ++++++++++++++++++
 .../xa/recovery/XARecoveryConfig.java           | 171 ++++++
 tests/byteman-tests/pom.xml                     |   4 +-
 .../integration/ra/ResourceAdapterTest.java     |  21 +-
 tests/joram-tests/pom.xml                       |   4 +-
 tests/performance-tests/pom.xml                 |   4 +-
 tests/soak-tests/pom.xml                        |   4 +-
 tests/stress-tests/pom.xml                      |   4 +-
 tests/timing-tests/pom.xml                      |   4 +-
 27 files changed, 1203 insertions(+), 1582 deletions(-)
----------------------------------------------------------------------