You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by li...@apache.org on 2010/06/28 23:00:17 UTC
svn commit: r958733 - in /incubator/aries/sandbox/linsun/mds:
mds-impl/src/main/java/org/apache/aries/mds/impl/
mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/
mds-sample/src/main/resources/OSGI-INF/blueprint/
Author: linsun
Date: Mon Jun 28 21:00:16 2010
New Revision: 958733
URL: http://svn.apache.org/viewvc?rev=958733&view=rev
Log:
[message driven service]add ThreadContext to check if the correct transaction policy, method names are called between before delivery, after delivery and delivery
Added:
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java (with props)
Modified:
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/Activator.java
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/EndpointHandler.java
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/JmsEndpointFactory.java
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/MDSContainerImpl.java
incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/JtaTransactionPolicy.java
incubator/aries/sandbox/linsun/mds/mds-sample/src/main/resources/OSGI-INF/blueprint/config.xml
Modified: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/Activator.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/Activator.java?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/Activator.java (original)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/Activator.java Mon Jun 28 21:00:16 2010
@@ -51,7 +51,7 @@ public class Activator implements Bundle
public void start(BundleContext context) throws Exception {
System.out.println("aries-mds-impl starts");
- this.context = context;
+ Activator.context = context;
st = createServiceTracker();
st.open();
@@ -167,7 +167,7 @@ public class Activator implements Bundle
flt = "(" + Constants.OBJECTCLASS + "=" + type.getName() + ")";
}
Filter osgiFilter = FrameworkUtil.createFilter(flt);
- tracker = new ServiceTracker(bc == null ? this.context : bc, osgiFilter, null);
+ tracker = new ServiceTracker(bc == null ? Activator.context : bc, osgiFilter, null);
tracker.open();
// add tracker to the list of trackers we close at tear down
Modified: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/EndpointHandler.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/EndpointHandler.java?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/EndpointHandler.java (original)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/EndpointHandler.java Mon Jun 28 21:00:16 2010
@@ -19,7 +19,6 @@
package org.apache.aries.mds.impl;
import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
@@ -29,8 +28,8 @@ import javax.resource.spi.UnavailableExc
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.transaction.xa.XAResource;
-import org.apache.aries.mds.MDSContainer;
import org.apache.aries.mds.impl.exception.ApplicationException;
+import org.apache.aries.mds.impl.exception.MDSException;
import org.apache.aries.mds.impl.exception.SystemException;
import org.apache.aries.mds.impl.transaction.TransactionPolicy;
@@ -113,7 +112,7 @@ public class EndpointHandler implements
}
}
- private Object deliverMessage(Method method, Object[] args) throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
+ private Object deliverMessage(Method method, Object[] args) throws Throwable {
boolean callBeforeAfter = false;
@@ -141,7 +140,12 @@ public class EndpointHandler implements
Throwable throwable = null;
Object value = null;
try {
- value = method.invoke(this.instance, args);
+ value = this.container.invoke(this.instance, method, args);
+ } catch (SystemException se) {
+ throwable = (se.getRootCause() != null) ? se.getRootCause() : se;
+ state = State.SYSTEM_EXCEPTION;
+ } catch (ApplicationException ae) {
+ throwable = (ae.getRootCause() != null) ? ae.getRootCause() : ae;
} finally {
// if the adapter is not using before/after, we must call afterDelivery to clean up
if (callBeforeAfter) {
@@ -157,6 +161,16 @@ public class EndpointHandler implements
}
}
}
+
+ if (throwable != null) {
+ throwable.printStackTrace();
+ if (isValidException(method, throwable)) {
+ throw throwable;
+ } else {
+ throw new MDSException().initCause(throwable);
+ }
+ }
+
return value;
}
@@ -174,24 +188,12 @@ public class EndpointHandler implements
// call afterDelivery on the container
try {
- container.afterDelivery(txPolicy);
- } catch (SystemException se) {
- Throwable throwable = (se.getRootCause() != null) ? se.getRootCause() : se;
- throw new ApplicationServerInternalException(throwable);
- }
- boolean exceptionThrown = false;
- try {
- // TODO invoke after delivery actions
+ container.afterDelivery();
} catch (Exception se) {
- exceptionThrown = true;
-
Throwable throwable = (se.getCause() != null) ? se.getCause() : se;
throwable.printStackTrace();
throw new ApplicationServerInternalException(throwable);
} finally {
- /*if (state == State.SYSTEM_EXCEPTION) {
- recreateInstance(exceptionThrown);
- }*/
// we are now in the default NONE state
state = State.NONE;
}
@@ -230,5 +232,17 @@ public class EndpointHandler implements
// free instance?
}
+
+ private boolean isValidException(Method method, Throwable throwable) {
+ if (throwable instanceof RuntimeException || throwable instanceof Error) return true;
+
+ Class<?>[] exceptionTypes = method.getExceptionTypes();
+ for (Class<?> exceptionType : exceptionTypes) {
+ if (exceptionType.isInstance(throwable)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/JmsEndpointFactory.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/JmsEndpointFactory.java?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/JmsEndpointFactory.java (original)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/JmsEndpointFactory.java Mon Jun 28 21:00:16 2010
@@ -28,7 +28,6 @@ import javax.resource.spi.endpoint.Messa
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;
-import org.apache.aries.mds.MDSContainer;
import org.apache.aries.mds.impl.exception.ApplicationException;
import org.apache.aries.mds.impl.exception.SystemException;
import org.apache.aries.mds.impl.transaction.TransactionPolicy;
Modified: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/MDSContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/MDSContainerImpl.java?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/MDSContainerImpl.java (original)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/MDSContainerImpl.java Mon Jun 28 21:00:16 2010
@@ -18,7 +18,9 @@
*/
package org.apache.aries.mds.impl;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Arrays;
import javax.jms.MessageListener;
import javax.resource.ResourceException;
@@ -30,17 +32,19 @@ import javax.transaction.xa.XAResource;
import org.apache.aries.mds.MDSContainer;
import org.apache.aries.mds.impl.exception.ApplicationException;
+import org.apache.aries.mds.impl.exception.MDSException;
import org.apache.aries.mds.impl.exception.SystemException;
import org.apache.aries.mds.impl.transaction.JtaTransactionPolicyFactory;
import org.apache.aries.mds.impl.transaction.TransactionPolicy;
import org.apache.aries.mds.impl.transaction.TransactionPolicyFactory;
-import org.apache.aries.mds.impl.transaction.TransactionType;
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MDSContainerImpl implements MDSContainer {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(MDSContainerImpl.class);
private final ResourceAdapter ra;
private final ServiceReference sr;
private final BundleContext bc;
@@ -64,6 +68,11 @@ public class MDSContainerImpl implements
// TODO Attempt to load other activation spec impl class
e.printStackTrace();
}
+
+ Object pid = this.sr.getProperty("service.pid");
+ if (pid == null) {
+ throw new NullPointerException("service.pid has to be specified as a required service property");
+ }
if (activationSpecClass == null) {
throw new NullPointerException(
@@ -153,30 +162,180 @@ public class MDSContainerImpl implements
public void beforeDelivery(Method method, XAResource xaResource, TransactionPolicy txPolicy) throws SystemException {
+ ThreadContext callContext = new ThreadContext((String)sr.getProperty("service.pid"), txPolicy);
+ ThreadContext oldContext = ThreadContext.enter(callContext);
+
+ MdbCallContext mdbCallContext = new MdbCallContext();
+ callContext.set(MdbCallContext.class, mdbCallContext);
+ mdbCallContext.deliveryMethod = method;
+ mdbCallContext.oldCallContext = oldContext;
+ mdbCallContext.txPolicy = txPolicy;
try {
// if we have an xaResource and a transaction was not imported from the adapter, enlist the xaResource
- if (xaResource != null && txPolicy.isNewTransaction()) {
- txPolicy.enlistResource(xaResource);
+ if (xaResource != null && mdbCallContext.txPolicy.isNewTransaction()) {
+ mdbCallContext.txPolicy.enlistResource(xaResource);
}
} catch (Exception e) {
throw new SystemException("Unable to enlist xa resource in the transaction", e);
}
}
-
- public void afterDelivery(TransactionPolicy txPolicy) throws SystemException {
+ public Object invoke(Object instance, Method method, Object... args) throws SystemException, ApplicationException {
+ if (args == null) {
+ args = new Object[0];
+ }
+
+ // get the context data
+ ThreadContext callContext = ThreadContext.getThreadContext();
+ MdbCallContext mdbCallContext = callContext.get(MdbCallContext.class);
+
+ if (mdbCallContext == null) {
+ throw new IllegalStateException("beforeDelivery was not called");
+ }
+
+
+ // verify the delivery method passed to beforeDeliver is the same method that was invoked
+ if (!mdbCallContext.deliveryMethod.getName().equals(method.getName()) ||
+ !Arrays.deepEquals(mdbCallContext.deliveryMethod.getParameterTypes(), method.getParameterTypes())) {
+ throw new IllegalStateException("Delivery method specified in beforeDelivery is not the delivery method called");
+ }
+
+ LOGGER.info("invoking method " + method.getName() + "service.pid " + callContext.getServicePid());
+
+ Object value = null;
+ MDSException mdsException = null;
try {
- txPolicy.commit();
- } catch (Exception e) {
- // TODO handle different exception differently
+ value = method.invoke(instance, args);
+ } catch (Throwable e) {
+ // unwrap invocation target exception
+ if (e instanceof InvocationTargetException) {
+ e = ((InvocationTargetException) e).getTargetException();
+ }
+
+ // Any exception thrown by reflection; not by the enterprise bean. Possible
+ // Exceptions are:
+ // IllegalAccessException - if the underlying method is inaccessible.
+ // IllegalArgumentException - if the number of actual and formal parameters differ, or if an unwrapping conversion fails.
+ // NullPointerException - if the specified object is null and the method is an instance method.
+ // ExceptionInInitializerError - if the initialization provoked by this method fails.
+
+ // TODO: need to differentiate system exception and application exception
+ if (isApplicationException(e, method)) {
+ try {
+ handleApplicationException(mdbCallContext.txPolicy, e);
+ } catch (ApplicationException ex) {
+ mdsException = ex;
+ }
+ } else {
+ try {
+ handleSystemException(mdbCallContext.txPolicy, e);
+ } catch (SystemException ex) {
+ mdsException = ex;
+ throw ex;
+ }
+ }
+ } finally {
+ // log finished invoking method
+ // Log the invocation results
+ if (LOGGER.isDebugEnabled()) {
+ if (mdsException == null) {
+ LOGGER.debug("finished invoking method " + method.getName() + ". Return value:" + value);
+ } else {
+ Throwable exception = (mdsException.getRootCause() != null) ? mdsException.getRootCause() : mdsException;
+ LOGGER.debug("finished invoking method " + method.getName() + " with exception " + exception);
+ }
+ }
+ }
+
+ return value;
+
+ }
+
+ private void handleSystemException(TransactionPolicy txPolicy, Throwable sysException) throws SystemException {
+ // Mark the transaction for rollback
+ txPolicy.setRollbackOnly();
+
+ throw new SystemException(sysException);
+ }
+
+ private void handleApplicationException(TransactionPolicy txPolicy, Throwable appException) throws ApplicationException {
+ // we don't mark the transaction for rollback for application exception
+ if (!(appException instanceof ApplicationException)) {
+ throw new ApplicationException(appException);
+ }
+ }
+
+ private boolean isApplicationException(Throwable throwable, Method method) {
+ if (throwable instanceof RuntimeException || throwable instanceof Error) return false;
+
+ // check to see if it is declared method exceptions
+ Class<?>[] exceptionTypes = method.getExceptionTypes();
+ for (Class<?> exceptionType : exceptionTypes) {
+ if (exceptionType.isInstance(throwable)) {
+ return true;
+ }
+ }
+
+ return false;
+
+ }
+
+ public void afterDelivery() throws SystemException {
+ ThreadContext callContext = ThreadContext.getThreadContext();
+ MdbCallContext mdbCallContext = callContext.get(MdbCallContext.class);
+
+ if (callContext.getTransactionPolicy() == mdbCallContext.txPolicy) {
+ // Everything is in order, complete the transaction
+ try {
+ mdbCallContext.txPolicy.commit();
+ } catch (Exception e) {
+ // TODO handle different exception differently
+ try {
+ mdbCallContext.txPolicy.setRollbackOnly();
+ mdbCallContext.txPolicy.commit();
+ } catch (ApplicationException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ } finally {
+ callContext.setTransactionPolicy(null);
+ ThreadContext.exit(mdbCallContext.oldCallContext);
+ }
+ } else {
+ // System is corrupted... roll back both transactions
try {
- txPolicy.setRollbackOnly();
- txPolicy.commit();
- } catch (ApplicationException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ mdbCallContext.txPolicy.setRollbackOnly();
+ mdbCallContext.txPolicy.commit();
+ } catch (Exception e) {
+ LOGGER.error("Error rolling back transaction", e);
+ }
+
+ TransactionPolicy threadContextTxPolicy = callContext.getTransactionPolicy();
+ if (threadContextTxPolicy != null) {
+ try {
+ threadContextTxPolicy.setRollbackOnly();
+ threadContextTxPolicy.commit();
+ } catch (Exception e) {
+ //callContext.setDiscardInstance(true);
+ LOGGER.error("Error rolling back transaction", e);
+ }
}
+
+ ThreadContext.exit(mdbCallContext.oldCallContext);
+
+ if (threadContextTxPolicy != null) {
+ throw new SystemException(new IllegalStateException("ThreadContext is bound to another transaction " + threadContextTxPolicy));
+ } else {
+ throw new SystemException(new IllegalStateException("ThreadContext is not bound to specified transaction " + threadContextTxPolicy));
+ }
+
}
+
}
+ private static class MdbCallContext {
+ private Method deliveryMethod;
+ private TransactionPolicy txPolicy;
+ private ThreadContext oldCallContext;
+ }
}
Added: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java?rev=958733&view=auto
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java (added)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java Mon Jun 28 21:00:16 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.mds.impl;
+
+import java.util.HashMap;
+
+import org.apache.aries.mds.impl.transaction.TransactionPolicy;
+
+//resue code from apache openejb project with modification
+public class ThreadContext {
+ private static final ThreadLocal<ThreadContext> threadStorage = new ThreadLocal<ThreadContext>();
+
+ public static ThreadContext getThreadContext() {
+ ThreadContext threadContext = threadStorage.get();
+ return threadContext;
+ }
+
+ public static ThreadContext enter(ThreadContext newContext) {
+ if (newContext == null) {
+ throw new NullPointerException("newContext is null");
+ }
+
+ // update thread local
+ ThreadContext oldContext = threadStorage.get();
+ threadStorage.set(newContext);
+
+ // return old context so it can be used for exit call below
+ return oldContext;
+ }
+
+ public static void exit(ThreadContext oldContext) {
+ ThreadContext exitingContext = threadStorage.get();
+ if (exitingContext == null) {
+ throw new IllegalStateException("No existing context");
+ }
+
+ // update thread local
+ threadStorage.set(oldContext);
+ }
+
+ private final HashMap<Class, Object> data = new HashMap<Class, Object>();
+ private TransactionPolicy transactionPolicy;
+ private String servicePid;
+
+ public ThreadContext(ThreadContext that) {
+ this.data.putAll(that.data);
+ }
+
+ public ThreadContext(String servicePid, TransactionPolicy transactionPolicy) {
+ this.servicePid = servicePid;
+ this.transactionPolicy = transactionPolicy;
+ }
+
+ public TransactionPolicy getTransactionPolicy() {
+ return transactionPolicy;
+ }
+
+ public void setTransactionPolicy(TransactionPolicy transactionPolicy) {
+ this.transactionPolicy = transactionPolicy;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public <T> T get(Class<T> type) {
+ return (T)data.get(type);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public <T> T set(Class<T> type, T value) {
+ return (T) data.put(type, value);
+ }
+
+ public String getServicePid() {
+ return this.servicePid;
+ }
+}
Propchange: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/ThreadContext.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/JtaTransactionPolicy.java
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/JtaTransactionPolicy.java?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/JtaTransactionPolicy.java (original)
+++ incubator/aries/sandbox/linsun/mds/mds-impl/src/main/java/org/apache/aries/mds/impl/transaction/JtaTransactionPolicy.java Mon Jun 28 21:00:16 2010
@@ -39,6 +39,7 @@ import org.apache.aries.mds.impl.excepti
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+// resue code from apache openejb project with modification
public abstract class JtaTransactionPolicy implements TransactionPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(JtaTransactionPolicy.class);
Modified: incubator/aries/sandbox/linsun/mds/mds-sample/src/main/resources/OSGI-INF/blueprint/config.xml
URL: http://svn.apache.org/viewvc/incubator/aries/sandbox/linsun/mds/mds-sample/src/main/resources/OSGI-INF/blueprint/config.xml?rev=958733&r1=958732&r2=958733&view=diff
==============================================================================
--- incubator/aries/sandbox/linsun/mds/mds-sample/src/main/resources/OSGI-INF/blueprint/config.xml (original)
+++ incubator/aries/sandbox/linsun/mds/mds-sample/src/main/resources/OSGI-INF/blueprint/config.xml Mon Jun 28 21:00:16 2010
@@ -29,7 +29,7 @@
<entry key="ac:destination" value="Hello.Queue" />
<entry key="ac:destinationType" value="javax.jms.Queue" />
<entry key="transactionAttribute" value="Required" />
- <!-- <entry key="service.pid" value="org.apache.aries.mds.sample.myMessageDrivenBean" /> -->
+ <entry key="service.pid" value="org.apache.aries.mds.sample.myMessageDrivenBean" />
</service-properties>
</service>