You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by tv...@apache.org on 2018/01/18 15:46:04 UTC

[09/50] tomee git commit: create abstract end point handler

create abstract end point handler

(cherry picked from commit 8e2aad5b018d1435d212bebe39662bd58e338d57)


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/a1992b41
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/a1992b41
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/a1992b41

Branch: refs/heads/tomee-1.7.x
Commit: a1992b41e410182ad263e586c576fe80bec97573
Parents: ac58b0d
Author: Otavio Santana <ot...@gmail.com>
Authored: Fri Dec 15 07:53:06 2017 -0300
Committer: Thiago Veronezi <th...@veronezi.org>
Committed: Wed Jan 17 11:33:08 2018 -0500

----------------------------------------------------------------------
 .../core/mdb/AbstractEndpointHandler.java       | 201 +++++++++++++++++++
 .../openejb/core/mdb/EndpointHandler.java       |   6 -
 .../openejb/core/mdb/PoolEndpointHandler.java   |   6 -
 3 files changed, 201 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/a1992b41/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java
new file mode 100644
index 0000000..3df2438
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.core.mdb;
+
+import org.apache.openejb.ApplicationException;
+import org.apache.openejb.SystemException;
+import org.apache.openejb.resource.activemq.jms2.DelegateMessage;
+import org.apache.openejb.resource.activemq.jms2.JMS2;
+
+import javax.ejb.EJBException;
+import javax.jms.Message;
+import javax.resource.spi.ApplicationServerInternalException;
+import javax.resource.spi.UnavailableException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+abstract class AbstractEndpointHandler {
+
+    protected State state = State.NONE;
+    protected volatile Boolean isAmq;
+
+    protected Object instance;
+
+    //final
+    protected BaseMdbContainer container;
+
+
+    public abstract void beforeDelivery(final Method method) throws ApplicationServerInternalException;
+
+    protected abstract void recreateInstance(final boolean exceptionAlreadyThrown) throws UnavailableException;
+
+    public abstract void release();
+
+
+    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+
+        final String methodName = method.getName();
+        final Class<?>[] parameterTypes = method.getParameterTypes();
+
+        if (method.getDeclaringClass() == Object.class) {
+            if ("toString".equals(methodName) && parameterTypes.length == 0) {
+                return toString();
+            } else if ("equals".equals(methodName) && parameterTypes.length == 1) {
+                return equals(args[0]);
+            } else if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
+                return hashCode();
+            } else {
+                throw new UnsupportedOperationException("Unkown method: " + method);
+            }
+        }
+
+        if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) {
+            beforeDelivery((Method) args[0]);
+            return null;
+        } else if ("afterDelivery".equals(methodName) && parameterTypes.length == 0) {
+            afterDelivery();
+            return null;
+        } else if ("release".equals(methodName) && parameterTypes.length == 0) {
+            release();
+            return null;
+        } else {
+            final Object value = deliverMessage(method, args);
+            return value;
+        }
+    }
+
+    public Object deliverMessage(final Method method, final Object[] args) throws Throwable {
+
+        boolean callBeforeAfter = false;
+
+        // verify current state
+        switch (state) {
+            case NONE:
+                try {
+                    beforeDelivery(method);
+                } catch (final ApplicationServerInternalException e) {
+                    throw (EJBException) new EJBException().initCause(e.getCause());
+                }
+                callBeforeAfter = true;
+                state = State.METHOD_CALLED;
+                break;
+            case BEFORE_CALLED:
+                state = State.METHOD_CALLED;
+                break;
+            case RELEASED:
+                throw new IllegalStateException("Message endpoint factory has been released");
+            case METHOD_CALLED:
+            case SYSTEM_EXCEPTION:
+                throw new IllegalStateException("The last message delivery must be completed with an afterDeliver before another message can be delivered");
+        }
+
+        Throwable throwable = null;
+        Object value = null;
+        try {
+            // deliver the message
+            value = container.invoke(instance, method, null, wrapMessageForAmq5(args));
+        } catch (final SystemException se) {
+            throwable = se.getRootCause() != null ? se.getRootCause() : se;
+            state = State.SYSTEM_EXCEPTION;
+        } catch (final ApplicationException ae) {
+            throwable = ae.getRootCause() != null ? ae.getRootCause() : ae;
+        } finally {
+            // if the adapter is not using before/after, we must call afterDelivery to clean up
+            if (callBeforeAfter) {
+                try {
+                    afterDelivery();
+                } catch (final ApplicationServerInternalException e) {
+                    throwable = throwable == null ? e.getCause() : throwable;
+                } catch (final UnavailableException e) {
+                    throwable = throwable == null ? e : throwable;
+                }
+            }
+        }
+
+        if (throwable != null) {
+            throwable.printStackTrace();
+            if (isValidException(method, throwable)) {
+                throw throwable;
+            } else {
+                throw new EJBException().initCause(throwable);
+            }
+        }
+        return value;
+    }
+
+    public void afterDelivery() throws ApplicationServerInternalException, UnavailableException {
+        switch (state) {
+            case RELEASED:
+                throw new IllegalStateException("Message endpoint factory has been released");
+            case NONE:
+                throw new IllegalStateException("afterDelivery may only be called if message delivery began with a beforeDelivery call");
+        }
+
+
+        // call afterDelivery on the container
+        boolean exceptionThrown = false;
+        try {
+            container.afterDelivery(instance);
+        } catch (final SystemException se) {
+            exceptionThrown = true;
+
+            final Throwable throwable = se.getRootCause() != null ? se.getRootCause() : se;
+            throwable.printStackTrace();
+            throw new ApplicationServerInternalException(throwable);
+        } finally {
+            if (state == State.SYSTEM_EXCEPTION) {
+                recreateInstance(exceptionThrown);
+            }
+            // we are now in the default NONE state
+            state = State.NONE;
+        }
+    }
+
+    // workaround for AMQ 5/JMS 2 support
+    private Object[] wrapMessageForAmq5(final Object[] args) {
+        if (args == null || args.length != 1 || DelegateMessage.class.isInstance(args[0])) {
+            return args;
+        }
+
+        if (isAmq == null) {
+            synchronized (this) {
+                if (isAmq == null) {
+                    isAmq = args[0].getClass().getName().startsWith("org.apache.activemq.");
+                }
+            }
+        }
+        if (isAmq) {
+            args[0] = JMS2.wrap(Message.class.cast(args[0]));
+        }
+        return args;
+    }
+
+    private boolean isValidException(final Method method, final Throwable throwable) {
+        if (throwable instanceof RuntimeException || throwable instanceof Error) {
+            return true;
+        }
+
+        final Class<?>[] exceptionTypes = method.getExceptionTypes();
+        for (final Class<?> exceptionType : exceptionTypes) {
+            if (exceptionType.isInstance(throwable)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/a1992b41/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
index 489ace2..0b45b5d 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
@@ -52,10 +52,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint {
 
 
     public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
-//        System.out.println("\n" +
-//                "***************************************\n" +
-//                "Endpoint invoked " + method + "\n" +
-//                "***************************************\n\n");
 
         final String methodName = method.getName();
         final Class<?>[] parameterTypes = method.getParameterTypes();
@@ -72,7 +68,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint {
             }
         }
 
-//        try {
         if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) {
             beforeDelivery((Method) args[0]);
             return null;
@@ -86,7 +81,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint {
             final Object value = deliverMessage(method, args);
             return value;
         }
-//        } finally { logTx(); }
     }
 
     public void beforeDelivery(final Method method) throws ApplicationServerInternalException {

http://git-wip-us.apache.org/repos/asf/tomee/blob/a1992b41/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java
index cc1cc13..f507a35 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java
@@ -57,10 +57,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint {
     }
 
     public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
-//        System.out.println("\n" +
-//                "***************************************\n" +
-//                "Endpoint invoked " + method + "\n" +
-//                "***************************************\n\n");
 
         final String methodName = method.getName();
         final Class<?>[] parameterTypes = method.getParameterTypes();
@@ -77,7 +73,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint {
             }
         }
 
-//        try {
         if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) {
             beforeDelivery((Method) args[0]);
             return null;
@@ -91,7 +86,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint {
             final Object value = deliverMessage(method, args);
             return value;
         }
-//        } finally { logTx(); }
     }
 
     public void beforeDelivery(final Method method) throws ApplicationServerInternalException {