You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2018/01/25 17:08:14 UTC
[aries-tx-control] 02/02: Tx Control spec compliance - the XA JPA
provider must support recovery to be the reference implementation
This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-tx-control.git
commit 4f155dd818e87b584244647f18893d7c460b6c05
Author: Tim Ward <ti...@apache.org>
AuthorDate: Thu Jan 25 12:05:34 2018 -0500
Tx Control spec compliance - the XA JPA provider must support recovery to be the reference implementation
---
.../aries/tx/control/jpa/xa/impl/Activator.java | 5 +-
.../impl/JPAEntityManagerProviderFactoryImpl.java | 61 ++++++++--
.../jpa/xa/impl/JPAEntityManagerProviderImpl.java | 45 ++++++-
.../jpa/xa/impl/RecoverableXAResourceImpl.java | 135 +++++++++++++++++++++
.../xa/impl/XATxContextBindingEntityManager.java | 2 +-
.../impl/XATxContextBindingEntityManagerTest.java | 2 +-
6 files changed, 236 insertions(+), 14 deletions(-)
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
index 9686e7f..50be0fe 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/Activator.java
@@ -18,6 +18,8 @@
*/
package org.apache.aries.tx.control.jpa.xa.impl;
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.XA_ENLISTMENT_ENABLED;
+
import java.util.Dictionary;
import java.util.Hashtable;
@@ -49,7 +51,8 @@ public class Activator extends JPAResourceActivator {
@Override
protected Dictionary<String, Object> getServiceProperties() {
Dictionary<String, Object> props = new Hashtable<>();
- props.put("osgi.xa.enabled", Boolean.TRUE);
+ props.put(XA_ENLISTMENT_ENABLED, Boolean.TRUE);
+ props.put("osgi.recovery.enabled", Boolean.TRUE);
return props;
}
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
index 51c112c..c171faf 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
@@ -24,6 +24,7 @@ import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.JTA_D
import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.NON_JTA_DATA_SOURCE;
import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION;
import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.LOCAL_ENLISTMENT_ENABLED;
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.OSGI_RECOVERY_IDENTIFIER;
import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.PRE_ENLISTED_DB_CONNECTION;
import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.TRANSACTIONAL_DB_CONNECTION;
import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.XA_ENLISTMENT_ENABLED;
@@ -112,6 +113,11 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
Function<ThreadLocal<TransactionControl>, AbstractJPAEntityManagerProvider> create;
JDBCConnectionProvider provider = (JDBCConnectionProvider) resourceProviderProperties.get(TRANSACTIONAL_DB_CONNECTION);
+ if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+ LOGGER.warn("Unable to set a recovery identifier {}. This should be set when creating the JDBC Resource Provider",
+ resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+ }
+
create = tx -> {
jpaPropsToUse.put(JTA_DATA_SOURCE,
new ScopedConnectionDataSource(provider.getResource(tx.get())));
@@ -131,6 +137,12 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
LOGGER.error("No datasource supplied in the configuration");
throw new IllegalArgumentException("No pre-enlisted datasource could be found to create the EntityManagerFactory. Please provide either a javax.persistence.jtaDataSource");
}
+
+ if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+ LOGGER.warn("Unable to set a recovery identifier {} for a pre-enlisted connection",
+ resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+ }
+
create = tx -> {
DataSource toUse = JPADataSourceHelper.poolIfNecessary(resourceProviderProperties, (DataSource) supplied);
jpaPropsToUse.put(JTA_DATA_SOURCE, toUse);
@@ -200,14 +212,14 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
Map<String, Object> resourceProviderProperties, ThreadLocal<TransactionControl> localStore, Runnable onClose) {
Map<String, Object> toUse;
if(checkEnlistment(resourceProviderProperties)) {
- toUse = enlistDataSource(localStore, jpaProperties);
+ toUse = enlistDataSource(localStore, jpaProperties, resourceProviderProperties);
} else {
toUse = jpaProperties;
}
setupTransactionManager(context, toUse, localStore, emfb);
- return localStore.get().notSupported(() -> internalBuilderCreate(emfb, toUse, localStore, onClose));
+ return localStore.get().notSupported(() -> internalBuilderCreate(emfb, toUse, resourceProviderProperties, localStore, onClose));
}
private boolean checkEnlistment(Map<String, Object> resourceProviderProperties) {
@@ -221,11 +233,12 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
return !toBoolean(resourceProviderProperties, PRE_ENLISTED_DB_CONNECTION, false);
}
- private Map<String, Object> enlistDataSource(ThreadLocal<TransactionControl> tx, Map<String, Object> jpaProperties) {
+ private Map<String, Object> enlistDataSource(ThreadLocal<TransactionControl> tx, Map<String, Object> jpaProperties, Map<String, Object> resourceProviderProperties) {
Map<String, Object> toReturn = new HashMap<>(jpaProperties);
DataSource enlistedDS = new EnlistingDataSource(tx,
- (DataSource)jpaProperties.get(JTA_DATA_SOURCE));
+ (DataSource)jpaProperties.get(JTA_DATA_SOURCE),
+ (String) resourceProviderProperties.get(OSGI_RECOVERY_IDENTIFIER));
toReturn.put(JTA_DATA_SOURCE, enlistedDS);
@@ -356,7 +369,8 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
}
private AbstractJPAEntityManagerProvider internalBuilderCreate(EntityManagerFactoryBuilder emfb,
- Map<String, Object> jpaProperties, ThreadLocal<TransactionControl> tx, Runnable onClose) {
+ Map<String, Object> jpaProperties, Map<String, Object> providerProperties,
+ ThreadLocal<TransactionControl> tx, Runnable onClose) {
EntityManagerFactory emf = emfb.createEntityManagerFactory(jpaProperties);
validateEMF(emf);
@@ -369,7 +383,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
if (onClose != null) {
onClose.run();
}
- });
+ }, context, jpaProperties, providerProperties);
}
private void validateEMF(EntityManagerFactory emf) {
@@ -396,7 +410,13 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
checkEnlistment(resourceProviderProperties);
validateEMF(emf);
- return new JPAEntityManagerProviderImpl(emf, new ThreadLocal<>(), null);
+ if(resourceProviderProperties.containsKey(OSGI_RECOVERY_IDENTIFIER)) {
+ LOGGER.warn("Unable to set a recovery identifier {} for an existing EntityManagerFactory",
+ resourceProviderProperties.remove(OSGI_RECOVERY_IDENTIFIER));
+ }
+
+ return new JPAEntityManagerProviderImpl(emf, new ThreadLocal<>(), null, null,
+ null, null);
}
public static boolean toBoolean(Map<String, Object> props, String key, boolean defaultValue) {
@@ -460,14 +480,18 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
public static class EnlistingDataSource implements DataSource {
private final DataSource delegate;
+
+ private final String recoveryIdentifier;
private final UUID resourceId = UUID.randomUUID();
private final ThreadLocal<TransactionControl> txControlToUse;
- public EnlistingDataSource(ThreadLocal<TransactionControl> txControlToUse, DataSource delegate) {
+ public EnlistingDataSource(ThreadLocal<TransactionControl> txControlToUse,
+ DataSource delegate, String recoveryIdentifier) {
this.txControlToUse = txControlToUse;
this.delegate = delegate;
+ this.recoveryIdentifier = recoveryIdentifier;
}
public TransactionControl getTxControl() {
@@ -498,6 +522,23 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
return enlistedConnection(() -> delegate.getConnection());
}
+ /**
+ * Used by the {@link RecoverableXAResourceImpl}
+ * @return
+ * @throws SQLException
+ */
+ Connection getRawConnection() throws SQLException {
+ return delegate.getConnection();
+ }
+ /**
+ * Used by the {@link RecoverableXAResourceImpl}
+ * @return
+ * @throws SQLException
+ */
+ Connection getRawConnection(String username, String password) throws SQLException {
+ return delegate.getConnection(username, password);
+ }
+
public void setLoginTimeout(int seconds) throws SQLException {
delegate.setLoginTimeout(seconds);
}
@@ -537,7 +578,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
toReturn = new ScopedConnectionWrapper(toClose);
} else if (txContext.supportsXA()) {
toReturn = new TxConnectionWrapper(toClose);
- txContext.registerXAResource(getXAResource(toClose), null);
+ txContext.registerXAResource(getXAResource(toClose), recoveryIdentifier);
} else {
throw new TransactionException(
"There is a transaction active, but it does not support XA participants");
@@ -562,7 +603,7 @@ public class JPAEntityManagerProviderFactoryImpl implements InternalJPAEntityMan
return toReturn;
}
- private XAResource getXAResource(Connection conn) throws SQLException {
+ public static XAResource getXAResource(Connection conn) throws SQLException {
if(conn instanceof XAConnectionWrapper) {
return ((XAConnectionWrapper)conn).getXaResource();
} else if(conn.isWrapperFor(XAConnectionWrapper.class)){
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
index b040f1f..3005866 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderImpl.java
@@ -18,29 +18,72 @@
*/
package org.apache.aries.tx.control.jpa.xa.impl;
+import static java.util.Optional.ofNullable;
+import static org.apache.aries.tx.control.jpa.xa.impl.XAJPADataSourceSetup.JTA_DATA_SOURCE;
+import static org.osgi.service.transaction.control.jpa.JPAEntityManagerProviderFactory.OSGI_RECOVERY_IDENTIFIER;
+
+import java.util.Map;
import java.util.UUID;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import org.apache.aries.tx.control.jpa.common.impl.AbstractJPAEntityManagerProvider;
+import org.apache.aries.tx.control.jpa.xa.impl.JPAEntityManagerProviderFactoryImpl.EnlistingDataSource;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.transaction.control.TransactionControl;
import org.osgi.service.transaction.control.TransactionException;
+import org.osgi.service.transaction.control.recovery.RecoverableXAResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JPAEntityManagerProviderImpl extends AbstractJPAEntityManagerProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(JPAEntityManagerProviderImpl.class);
+
private final UUID uuid = UUID.randomUUID();
private final ThreadLocal<TransactionControl> tx;
+
+ private final String recoveryIdentifier;
+
+ private final ServiceRegistration<RecoverableXAResource> reg;
public JPAEntityManagerProviderImpl(EntityManagerFactory emf, ThreadLocal<TransactionControl> tx,
- Runnable onClose) {
+ Runnable onClose, BundleContext ctx, Map<String, Object> jpaProps,
+ Map<String, Object> providerProps) {
super(emf, onClose);
this.tx = tx;
+
+ recoveryIdentifier = (String) ofNullable(providerProps)
+ .map(m -> m.get(OSGI_RECOVERY_IDENTIFIER))
+ .orElse(null);
+
+ if(recoveryIdentifier != null) {
+ EnlistingDataSource ds = (EnlistingDataSource) jpaProps.get(JTA_DATA_SOURCE);
+ reg = ctx.registerService(RecoverableXAResource.class,
+ new RecoverableXAResourceImpl(recoveryIdentifier, ds,
+ (String) providerProps.get("recovery.user"),
+ (String) providerProps.get(".recovery.password)")),
+ null);
+ } else {
+ reg = null;
+ }
}
@Override
public EntityManager getResource(TransactionControl txControl) throws TransactionException {
return new XATxContextBindingEntityManager(txControl, this, uuid, tx);
}
+
+ public void unregister() {
+ if(reg != null) {
+ try {
+ reg.unregister();
+ } catch (IllegalStateException ise) {
+ LOG.debug("An exception occurred when unregistering the recovery service for {}", recoveryIdentifier);
+ }
+ }
+ }
}
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java
new file mode 100644
index 0000000..229b443
--- /dev/null
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/RecoverableXAResourceImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.tx.control.jpa.xa.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.aries.tx.control.jpa.xa.impl.JPAEntityManagerProviderFactoryImpl.EnlistingDataSource;
+import org.osgi.service.transaction.control.recovery.RecoverableXAResource;
+
+public class RecoverableXAResourceImpl implements RecoverableXAResource {
+
+ private final String id;
+
+ private final EnlistingDataSource ds;
+
+ private final String recoveryUser;
+
+ private final String recoveryPw;
+
+ public RecoverableXAResourceImpl(String id, EnlistingDataSource ds, String recoveryUser,
+ String recoveryPw) {
+ this.id = id;
+ this.ds = ds;
+ this.recoveryUser = recoveryUser;
+ this.recoveryPw = recoveryPw;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public XAResource getXAResource() throws Exception {
+ Connection recoveryConn;
+ if(recoveryUser != null) {
+ recoveryConn = ds.getRawConnection(recoveryUser, recoveryPw);
+ } else {
+ recoveryConn = ds.getRawConnection();
+ }
+
+ return new CloseableXAResource(recoveryConn);
+ }
+
+ @Override
+ public void releaseXAResource(XAResource xaRes) {
+ if(xaRes instanceof CloseableXAResource) {
+ try {
+ ((CloseableXAResource) xaRes).close();
+ } catch (Exception e) {
+ // This is fine, the connection has been returned
+ }
+ } else {
+ throw new IllegalArgumentException("The XAResource being returned was not created by this provider implementation");
+ }
+ }
+
+ private static class CloseableXAResource implements XAResource, AutoCloseable {
+ private final Connection conn;
+
+ private final XAResource resource;
+
+ public CloseableXAResource(Connection conn) throws SQLException {
+ conn.isValid(5);
+ this.conn = conn;
+ this.resource = EnlistingDataSource.getXAResource(conn);
+ }
+
+ @Override
+ public void close() throws Exception {
+ conn.close();
+ }
+
+ public void commit(Xid arg0, boolean arg1) throws XAException {
+ resource.commit(arg0, arg1);
+ }
+
+ public void end(Xid arg0, int arg1) throws XAException {
+ resource.end(arg0, arg1);
+ }
+
+ public void forget(Xid arg0) throws XAException {
+ resource.forget(arg0);
+ }
+
+ public int getTransactionTimeout() throws XAException {
+ return resource.getTransactionTimeout();
+ }
+
+ public boolean isSameRM(XAResource arg0) throws XAException {
+ return resource.isSameRM(arg0);
+ }
+
+ public int prepare(Xid arg0) throws XAException {
+ return resource.prepare(arg0);
+ }
+
+ public Xid[] recover(int arg0) throws XAException {
+ return resource.recover(arg0);
+ }
+
+ public void rollback(Xid arg0) throws XAException {
+ resource.rollback(arg0);
+ }
+
+ public boolean setTransactionTimeout(int arg0) throws XAException {
+ return resource.setTransactionTimeout(arg0);
+ }
+
+ public void start(Xid arg0, int arg1) throws XAException {
+ resource.start(arg0, arg1);
+ }
+ }
+}
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
index 73398c6..9ed4043 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java
@@ -82,7 +82,7 @@ public class XATxContextBindingEntityManager extends EntityManagerWrapper {
toClose.joinTransaction();
} else {
throw new TransactionException(
- "There is a transaction active, but it does not support local participants");
+ "There is a transaction active, but it does not support xa participants");
}
} catch (Exception sqle) {
commonTxStore.set(previous);
diff --git a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
index 7f27829..254f3ca 100644
--- a/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
+++ b/tx-control-providers/jpa/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java
@@ -88,7 +88,7 @@ public class XATxContextBindingEntityManagerTest {
Mockito.when(context.getScopedValue(Mockito.any()))
.thenAnswer(i -> variables.get(i.getArguments()[0]));
- provider = new JPAEntityManagerProviderImpl(emf, commonTxControl, null);
+ provider = new JPAEntityManagerProviderImpl(emf, commonTxControl, null, null, null, null);
em = new XATxContextBindingEntityManager(control, provider, id, commonTxControl);
}
--
To stop receiving notification emails like this one, please contact
timothyjward@apache.org.