You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2018/01/25 17:08:14 UTC

[aries-tx-control] 02/02: Tx Control spec compliance - the XA JPA provider must support recovery to be the reference implementation

This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-tx-control.git

commit 4f155dd818e87b584244647f18893d7c460b6c05
Author: Tim Ward <ti...@apache.org>
AuthorDate: Thu Jan 25 12:05:34 2018 -0500

    Tx Control spec compliance - the XA JPA provider must support recovery to be the reference implementation
---
 .../aries/tx/control/jpa/xa/impl/Activator.java    |   5 +-
 .../impl/JPAEntityManagerProviderFactoryImpl.java  |  61 ++++++++--
 .../jpa/xa/impl/JPAEntityManagerProviderImpl.java  |  45 ++++++-
 .../jpa/xa/impl/RecoverableXAResourceImpl.java     | 135 +++++++++++++++++++++
 .../xa/impl/XATxContextBindingEntityManager.java   |   2 +-
 .../impl/XATxContextBindingEntityManagerTest.java  |   2 +-
 6 files changed, 236 insertions(+), 14 deletions(-)

diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
index 9686e7f..50be0fe 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.aries.tx.control.jpa.xa.impl;
 
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.XA_ENLISTMENT_ENABLED;
+
 import java.util.Dictionary;
 import java.util.Hashtable;
 
@@ -49,7 +51,8 @@ public class Activator extends JPAResourceActivator {
 	@Override
 	protected Dictionary<String, Object> getServiceProperties() {
 		Dictionary<String, Object> props = new Hashtable<>();
-		props.put("osgi.xa.enabled", Boolean.TRUE);
+		props.put(XA_ENLISTMENT_ENABLED, Boolean.TRUE);
+		props.put("osgi.recovery.enabled", Boolean.TRUE);
 		return props;
 	}
 
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
index 51c112c..c171faf 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
@@ -24,6 +24,7 @@ import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.JTA_D
 import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.NON_JTA_DATA_SOURCE;
 import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION;
 import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.LOCAL_ENLISTMENT_ENABLED;
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.OSGI_RECOVERY_IDENTIFIER;
 import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.PRE_ENLISTED_DB_CONNECTION;
 import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.TRANSACTIONAL_DB_CONNECTION;
 import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.XA_ENLISTMENT_ENABLED;
@@ -112,6 +113,11 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 		Function<ThreadLocal<TransactionControl>, AbstractJPAEntityManagerProvider> create;
 		JDBCConnectionProvider provider = (JDBCConnectionProvider) resourceProviderProperties.get(TRANSACTIONAL_DB_CONNECTION);
 		
+		if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+			LOGGER.warn("Unable to set a recovery identifier {}. This should be set when creating the JDBC Resource Provider", 
+					resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+		}
+		
 		create = tx -> {
 				jpaPropsToUse.put(JTA_DATA_SOURCE, 
 					new ScopedConnectionDataSource(provider.getResource(tx.get())));
@@ -131,6 +137,12 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 			LOGGER.error("No datasource supplied in the configuration");
 			throw new IllegalArgumentException("No pre-enlisted datasource could be found to create the EntityManagerFactory. Please provide either a javax.persistence.jtaDataSource");
 		}
+		
+		if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+			LOGGER.warn("Unable to set a recovery identifier {} for a pre-enlisted connection", 
+					resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+		}
+		
 		create = tx -> {
 			DataSource toUse = JPADataSourceHelper.poolIfNecessary(resourceProviderProperties, (DataSource) supplied);
 			jpaPropsToUse.put(JTA_DATA_SOURCE, toUse);
@@ -200,14 +212,14 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 		Map<String, Object> resourceProviderProperties, ThreadLocal<TransactionControl> localStore, Runnable onClose) {
 		Map<String, Object> toUse;
 		if(checkEnlistment(resourceProviderProperties)) {
-			toUse = enlistDataSource(localStore, jpaProperties);
+			toUse = enlistDataSource(localStore, jpaProperties, resourceProviderProperties);
 		} else {
 			toUse = jpaProperties;
 		}
 		
 		setupTransactionManager(context, toUse, localStore, emfb);
 		
-		return localStore.get().notSupported(() -> internalBuilderCreate(emfb, toUse, localStore, onClose));
+		return localStore.get().notSupported(() -> internalBuilderCreate(emfb, toUse, resourceProviderProperties, localStore, onClose));
 	}
 
 	private boolean checkEnlistment(Map<String, Object> resourceProviderProperties) {
@@ -221,11 +233,12 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 		return !toBoolean(resourceProviderProperties, PRE_ENLISTED_DB_CONNECTION, false);
 	}
 
-	private Map<String, Object> enlistDataSource(ThreadLocal<TransactionControl> tx, Map<String, Object> jpaProperties) {
+	private Map<String, Object> enlistDataSource(ThreadLocal<TransactionControl> tx, Map<String, Object> jpaProperties, Map<String, Object> resourceProviderProperties) {
 		Map<String, Object> toReturn = new HashMap<>(jpaProperties);
 		
 		DataSource enlistedDS = new EnlistingDataSource(tx, 
-				(DataSource)jpaProperties.get(JTA_DATA_SOURCE));
+				(DataSource)jpaProperties.get(JTA_DATA_SOURCE),
+				(String) resourceProviderProperties.get(OSGI_RECOVERY_IDENTIFIER));
 		
 		toReturn.put(JTA_DATA_SOURCE, enlistedDS);
 		
@@ -356,7 +369,8 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 	}
 
 	private AbstractJPAEntityManagerProvider internalBuilderCreate(EntityManagerFactoryBuilder emfb,
-			Map<String, Object> jpaProperties, ThreadLocal<TransactionControl> tx, Runnable onClose) {
+			Map<String, Object> jpaProperties, Map<String, Object> providerProperties, 
+			ThreadLocal<TransactionControl> tx, Runnable onClose) {
 		EntityManagerFactory emf = emfb.createEntityManagerFactory(jpaProperties);
 		
 		validateEMF(emf);
@@ -369,7 +383,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 			if (onClose != null) {
 				onClose.run();
 			}
-		});
+		}, context, jpaProperties, providerProperties);
 	}
 
 	private void validateEMF(EntityManagerFactory emf) {
@@ -396,7 +410,13 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 		checkEnlistment(resourceProviderProperties);
 		validateEMF(emf);
 		
-		return new JPAEntityManagerProviderImpl(emf, new ThreadLocal<>(), null);
+		if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+			LOGGER.warn("Unable to set a recovery identifier {} for an existing EntityManagerFactory", 
+					resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+		}
+		
+		return new JPAEntityManagerProviderImpl(emf, new ThreadLocal<>(), null, null,
+				null, null);
 	}
 
 	public static boolean toBoolean(Map<String, Object> props, String key, boolean defaultValue) {
@@ -460,14 +480,18 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 	public static class EnlistingDataSource implements DataSource {
 		
 		private final DataSource delegate;
+		
+		private final String recoveryIdentifier;
 
 		private final UUID resourceId = UUID.randomUUID();
 		
 		private final ThreadLocal<TransactionControl> txControlToUse;
 		
-		public EnlistingDataSource(ThreadLocal<TransactionControl> txControlToUse, DataSource delegate) {
+		public EnlistingDataSource(ThreadLocal<TransactionControl> txControlToUse, 
+				DataSource delegate, String recoveryIdentifier) {
 			this.txControlToUse = txControlToUse;
 			this.delegate = delegate;
+			this.recoveryIdentifier = recoveryIdentifier;
 		}
 		
 		public TransactionControl getTxControl() {
@@ -498,6 +522,23 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 			return enlistedConnection(() -> delegate.getConnection());
 		}
 
+		/**
+		 * Used by the {@link RecoverableXAResourceImpl}
+		 * @return
+		 * @throws SQLException
+		 */
+		Connection getRawConnection() throws SQLException {
+			return delegate.getConnection();
+		}
+		/**
+		 * Used by the {@link RecoverableXAResourceImpl}
+		 * @return
+		 * @throws SQLException
+		 */
+		Connection getRawConnection(String username, String password) throws SQLException {
+			return delegate.getConnection(username, password);
+		}
+
 		public void setLoginTimeout(int seconds) throws SQLException {
 			delegate.setLoginTimeout(seconds);
 		}
@@ -537,7 +578,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 					toReturn = new ScopedConnectionWrapper(toClose);
 				} else if (txContext.supportsXA()) {
 					toReturn = new TxConnectionWrapper(toClose);
-					txContext.registerXAResource(getXAResource(toClose), null);
+					txContext.registerXAResource(getXAResource(toClose), recoveryIdentifier);
 				} else {
 					throw new TransactionException(
 							"There is a transaction active, but it does not support XA participants");
@@ -562,7 +603,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
 			return toReturn;
 		}
 		
-		private XAResource getXAResource(Connection conn) throws SQLException {
+		public static XAResource getXAResource(Connection conn) throws SQLException {
 			if(conn instanceof XAConnectionWrapper) {
 				return ((XAConnectionWrapper)conn).getXaResource();
 			} else if(conn.isWrapperFor(XAConnectionWrapper.class)){
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
index b040f1f..3005866 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
@@ -18,29 +18,72 @@
  */
 package org.apache.aries.tx.control.jpa.xa.impl;
 
+import static java.util.Optional.ofNullable;
+import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.JTA_DATA_SOURCE;
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.OSGI_RECOVERY_IDENTIFIER;
+
+import java.util.Map;
 import java.util.UUID;
 
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 
 import org.apache.aries.tx.control.jpa.common.impl.AbstractJPAEntityManagerProvider;
+import org.apache.aries.tx.control.jpa.xa.impl.JPAEntityManagerProviderFactoryImpl.EnlistingDataSource;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.transaction.control.TransactionControl;
 import org.osgi.service.transaction.control.TransactionException;
+import org.osgi.service.transaction.control.recovery.RecoverableXAResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JPAEntityManagerProviderImpl extends AbstractJPAEntityManagerProvider {
 
+	private static final Logger LOG = LoggerFactory.getLogger(JPAEntityManagerProviderImpl.class);
+	
 	private final UUID					uuid	= UUID.randomUUID();
 
 	private final ThreadLocal<TransactionControl> tx;
+	
+	private final String recoveryIdentifier;
+	
+	private final ServiceRegistration<RecoverableXAResource> reg;
 
 	public JPAEntityManagerProviderImpl(EntityManagerFactory emf, ThreadLocal<TransactionControl> tx,
-			Runnable onClose) {
+			Runnable onClose, BundleContext ctx, Map<String, Object> jpaProps, 
+			Map<String, Object> providerProps) {
 		super(emf, onClose);
 		this.tx = tx;
+		
+		recoveryIdentifier = (String) ofNullable(providerProps)
+								.map(m -> m.get(OSGI_RECOVERY_IDENTIFIER))
+								.orElse(null);
+		
+		if(recoveryIdentifier != null) {
+			EnlistingDataSource ds = (EnlistingDataSource) jpaProps.get(JTA_DATA_SOURCE);
+			reg = ctx.registerService(RecoverableXAResource.class, 
+					new RecoverableXAResourceImpl(recoveryIdentifier, ds, 
+							(String) providerProps.get("recovery.user"),
+							(String) providerProps.get(".recovery.password)")), 
+					null);
+		} else {
+			reg = null;
+		}
 	}
 
 	@Override
 	public EntityManager getResource(TransactionControl txControl) throws TransactionException {
 		return new XATxContextBindingEntityManager(txControl, this, uuid, tx);
 	}
+
+	public void unregister() {
+		if(reg != null) {
+			try {
+				reg.unregister();
+			} catch (IllegalStateException ise) {
+				LOG.debug("An exception occurred when unregistering the recovery service for {}", recoveryIdentifier);
+			}
+		}
+	}
 }
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java
new file mode 100644
index 0000000..229b443
--- /dev/null
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.tx.control.jpa.xa.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.aries.tx.control.jpa.xa.impl.JPAEntityManagerProviderFactoryImpl.EnlistingDataSource;
+import org.osgi.service.transaction.control.recovery.RecoverableXAResource;
+
+public class RecoverableXAResourceImpl implements RecoverableXAResource {
+
+	private final String id;
+	
+	private final EnlistingDataSource ds;
+	
+	private final String recoveryUser;
+	
+	private final String recoveryPw;
+	
+	public RecoverableXAResourceImpl(String id, EnlistingDataSource ds, String recoveryUser,
+			String recoveryPw) {
+		this.id = id;
+		this.ds = ds;
+		this.recoveryUser = recoveryUser;
+		this.recoveryPw = recoveryPw;
+	}
+
+	@Override
+	public String getId() {
+		return id;
+	}
+
+	@Override
+	public XAResource getXAResource() throws Exception {
+		Connection recoveryConn;
+		if(recoveryUser != null) {
+			recoveryConn = ds.getRawConnection(recoveryUser, recoveryPw);
+		} else {
+			recoveryConn = ds.getRawConnection();
+		}
+		
+		return new CloseableXAResource(recoveryConn);
+	}
+
+	@Override
+	public void releaseXAResource(XAResource xaRes) {
+		if(xaRes instanceof CloseableXAResource) {
+			try {
+				((CloseableXAResource) xaRes).close();
+			} catch (Exception e) {
+				// This is fine, the connection has been returned
+			}
+		} else {
+			throw new IllegalArgumentException("The XAResource being returned was not created by this provider implementation");
+		}
+	}
+
+	private static class CloseableXAResource implements XAResource, AutoCloseable {
+		private final Connection conn;
+		
+		private final XAResource resource;
+		
+		public CloseableXAResource(Connection conn) throws SQLException {
+			conn.isValid(5);
+			this.conn = conn;
+			this.resource = EnlistingDataSource.getXAResource(conn);
+		}
+
+		@Override
+		public void close() throws Exception {
+			conn.close();
+		}
+
+		public void commit(Xid arg0, boolean arg1) throws XAException {
+			resource.commit(arg0, arg1);
+		}
+
+		public void end(Xid arg0, int arg1) throws XAException {
+			resource.end(arg0, arg1);
+		}
+
+		public void forget(Xid arg0) throws XAException {
+			resource.forget(arg0);
+		}
+
+		public int getTransactionTimeout() throws XAException {
+			return resource.getTransactionTimeout();
+		}
+
+		public boolean isSameRM(XAResource arg0) throws XAException {
+			return resource.isSameRM(arg0);
+		}
+
+		public int prepare(Xid arg0) throws XAException {
+			return resource.prepare(arg0);
+		}
+
+		public Xid[] recover(int arg0) throws XAException {
+			return resource.recover(arg0);
+		}
+
+		public void rollback(Xid arg0) throws XAException {
+			resource.rollback(arg0);
+		}
+
+		public boolean setTransactionTimeout(int arg0) throws XAException {
+			return resource.setTransactionTimeout(arg0);
+		}
+
+		public void start(Xid arg0, int arg1) throws XAException {
+			resource.start(arg0, arg1);
+		}
+	}
+}
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
index 73398c6..9ed4043 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
@@ -82,7 +82,7 @@ public class XATxContextBindingEntityManager extends EntityManagerWrapper {
 				toClose.joinTransaction();
 			} else {
 				throw new TransactionException(
-						"There is a transaction active, but it does not support local participants");
+						"There is a transaction active, but it does not support xa participants");
 			}
 		} catch (Exception sqle) {
 			commonTxStore.set(previous);
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
index 7f27829..254f3ca 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
@@ -88,7 +88,7 @@ public class XATxContextBindingEntityManagerTest {
 		Mockito.when(context.getScopedValue(Mockito.any()))
 			.thenAnswer(i -> variables.get(i.getArguments()[0]));
 		
-		provider = new JPAEntityManagerProviderImpl(emf, commonTxControl, null);
+		provider = new JPAEntityManagerProviderImpl(emf, commonTxControl, null, null, null, null);
 		
 		em = new XATxContextBindingEntityManager(control, provider, id, commonTxControl);
 	}

-- 
To stop receiving notification emails like this one, please contact
timothyjward@apache.org.