You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by tv...@apache.org on 2018/01/18 15:44:37 UTC
[49/50] tomee git commit: Removing shared mdb/stateless code. We will
use private classes for the mdb container like we originally have done with
the stateless container.
Removing shared mdb/stateless code. We will use private classes for the mdb container like we originally have done with the stateless container.
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/34a159fb
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/34a159fb
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/34a159fb
Branch: refs/heads/master
Commit: 34a159fbafa0bddcf9e0f8ca37700537e5fb3887
Parents: 1f05b1b
Author: Thiago Veronezi <th...@veronezi.org>
Authored: Wed Jan 17 10:11:46 2018 -0500
Committer: Thiago Veronezi <th...@veronezi.org>
Committed: Wed Jan 17 10:11:46 2018 -0500
----------------------------------------------------------------------
.../core/instance/InstanceCreatorRunnable.java | 41 --
.../openejb/core/instance/InstanceManager.java | 358 ----------------
.../core/instance/InstanceManagerData.java | 79 ----
.../openejb/core/mdb/MdbInstanceManager.java | 372 +++++++++++++++-
.../apache/openejb/core/stateless/Instance.java | 48 +++
.../core/stateless/StatelessContainer.java | 1 -
.../stateless/StatelessInstanceManager.java | 422 +++++++++++++++++--
7 files changed, 808 insertions(+), 513 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java
deleted file mode 100644
index d87c330..0000000
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.openejb.core.instance;
-
-import org.apache.openejb.core.mdb.Instance;
-
-public final class InstanceCreatorRunnable implements Runnable {
-
- private final InstanceManagerData data;
- private final InstanceManager.InstanceSupplier supplier;
- private final long offset;
-
- public InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset,
- final InstanceManagerData data, final InstanceManager.InstanceSupplier supplier) {
- this.data = data;
- this.supplier = supplier;
- this.offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l;
- }
-
- @Override
- public void run() {
- final Instance obj = supplier.create();
- if (obj != null) {
- data.getPool().add(obj, offset);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java
deleted file mode 100644
index 8043a55..0000000
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.openejb.core.instance;
-
-import org.apache.openejb.ApplicationException;
-import org.apache.openejb.BeanContext;
-import org.apache.openejb.OpenEJBException;
-import org.apache.openejb.SystemException;
-import org.apache.openejb.cdi.CdiEjbBean;
-import org.apache.openejb.core.InstanceContext;
-import org.apache.openejb.core.Operation;
-import org.apache.openejb.core.ThreadContext;
-import org.apache.openejb.core.interceptor.InterceptorData;
-import org.apache.openejb.core.interceptor.InterceptorStack;
-import org.apache.openejb.core.mdb.Instance;
-import org.apache.openejb.loader.Options;
-import org.apache.openejb.monitoring.LocalMBeanServer;
-import org.apache.openejb.util.DaemonThreadFactory;
-import org.apache.openejb.util.Duration;
-import org.apache.openejb.util.LogCategory;
-import org.apache.openejb.util.Logger;
-import org.apache.openejb.util.Pool;
-
-import javax.ejb.ConcurrentAccessTimeoutException;
-import javax.ejb.SessionBean;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.rmi.RemoteException;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-public abstract class InstanceManager {
- protected static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
- protected static final Method removeSessionBeanMethod;
-
- static { // initialize it only once
- Method foundRemoveMethod;
- try {
- foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove");
- } catch (final NoSuchMethodException e) {
- foundRemoveMethod = null;
- }
- removeSessionBeanMethod = foundRemoveMethod;
- }
-
- protected final Duration accessTimeout;
- protected final Duration closeTimeout;
- protected final Pool.Builder poolBuilder;
- protected final ThreadPoolExecutor executor;
- protected final ScheduledExecutorService scheduledExecutor;
-
- public InstanceManager(final Duration accessTimeout, final Duration closeTimeout,
- final Pool.Builder poolBuilder, final int callbackThreads,
- final ScheduledExecutorService ses) {
- this.accessTimeout = accessTimeout;
- this.closeTimeout = closeTimeout;
- this.poolBuilder = poolBuilder;
- this.scheduledExecutor = ses;
-
- if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) {
- ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true);
- }
-
- if (accessTimeout.getUnit() == null) {
- accessTimeout.setUnit(TimeUnit.MILLISECONDS);
- }
-
- final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1;
- final ThreadFactory threadFactory = new DaemonThreadFactory("InstanceManagerPool.worker.");
- this.executor = new ThreadPoolExecutor(
- callbackThreads, callbackThreads * 2,
- 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory);
-
- this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
-
- if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
- return;
- }
-
- try {
- if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
- logger.warning("Executor failed to run asynchronous process: " + r);
- }
- } catch (final InterruptedException e) {
- //Ignore
- }
- }
- });
- }
-
- protected final class InstanceSupplier implements Pool.Supplier<Instance> {
- private final BeanContext beanContext;
-
- public InstanceSupplier(final BeanContext beanContext) {
- this.beanContext = beanContext;
- }
-
- @Override
- public void discard(final Instance instance, final Pool.Event reason) {
-
- final ThreadContext ctx = new ThreadContext(beanContext, null);
- final ThreadContext oldCallContext = ThreadContext.enter(ctx);
- try {
- freeInstance(ctx, instance);
- } finally {
- ThreadContext.exit(oldCallContext);
- }
- }
-
- @Override
- public Instance create() {
- final ThreadContext ctx = new ThreadContext(beanContext, null);
- final ThreadContext oldCallContext = ThreadContext.enter(ctx);
- try {
- return createInstance(ctx.getBeanContext());
- } catch (final OpenEJBException e) {
- logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e);
- } finally {
- ThreadContext.exit(oldCallContext);
- }
- return null;
- }
- }
-
- public void destroy() {
- if (executor != null) {
- executor.shutdown();
- try {
- if (!executor.awaitTermination(10000, MILLISECONDS)) {
- java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- if (scheduledExecutor != null) {
- scheduledExecutor.shutdown();
- try {
- if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) {
- java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- /**
- * Removes an instance from the pool and returns it for use
- * by the container in business methods.
- * <p/>
- * If the pool is at it's limit the StrictPooling flag will
- * cause this thread to wait.
- * <p/>
- * If StrictPooling is not enabled this method will create a
- * new bean instance performing all required injection
- * and callbacks before returning it in a method ready state.
- *
- * @param callContext ThreadContext
- * @return Object
- * @throws OpenEJBException
- */
- public Instance getInstance(final ThreadContext callContext) throws OpenEJBException {
- final BeanContext beanContext = callContext.getBeanContext();
- final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData();
-
- Instance instance = null;
- try {
- final Pool<Instance>.Entry entry = data.poolPop();
-
- if (entry != null) {
- instance = entry.get();
- instance.setPoolEntry(entry);
- }
- } catch (final TimeoutException e) {
- final String msg = "No instances available in Session Bean pool. Waited " + data.getAccessTimeout().toString();
- final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg);
- timeoutException.fillInStackTrace();
- throw new ApplicationException(timeoutException);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new OpenEJBException("Unexpected Interruption of current thread: ", e);
- }
-
- if (null == instance) {
- instance = createInstance(beanContext);
- }
-
- return instance;
- }
-
- private Instance createInstance(final BeanContext beanContext) throws ApplicationException {
- try {
- final InstanceContext context = beanContext.newInstance();
- return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext());
-
- } catch (Throwable e) {
- if (e instanceof InvocationTargetException) {
- e = ((InvocationTargetException) e).getTargetException();
- }
- final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e;
- logger.error(t, e);
- throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e));
- }
- }
-
- /**
- * All instances are removed from the pool in getInstance(...). They are only
- * returned by the Container via this method under two circumstances.
- * <p/>
- * 1. The business method returns normally
- * 2. The business method throws an application exception
- * <p/>
- * Instances are not returned to the pool if the business method threw a system
- * exception.
- *
- * @param callContext ThreadContext
- * @param bean Object
- * @throws OpenEJBException
- */
- public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException {
-
- if (bean == null) {
- throw new SystemException("Invalid arguments");
- }
-
- final Instance instance = Instance.class.cast(bean);
- final BeanContext beanContext = callContext.getBeanContext();
- final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData();
- final Pool<Instance> pool = data.getPool();
-
- if (instance.getPoolEntry() != null) {
- pool.push(instance.getPoolEntry());
- } else {
- pool.push(instance);
- }
- }
-
- /**
- * This method is called to release the semaphore in case of the business method
- * throwing a system exception
- *
- * @param callContext ThreadContext
- * @param bean Object
- */
- public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException {
-
- if (bean == null) {
- throw new SystemException("Invalid arguments");
- }
-
- final Instance instance = Instance.class.cast(bean);
- final BeanContext beanContext = callContext.getBeanContext();
- final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData();
-
- if (null != data) {
- final Pool<Instance> pool = data.getPool();
- pool.discard(instance.getPoolEntry());
- }
- }
-
- @SuppressWarnings("unchecked")
- private void freeInstance(final ThreadContext callContext, final Instance instance) {
- try {
- callContext.setCurrentOperation(Operation.PRE_DESTROY);
- final BeanContext beanContext = callContext.getBeanContext();
-
- final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null;
-
- final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors();
- final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors);
-
- final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class);
- if (bean != null) { // TODO: see if it should be called before or after next call
- bean.getInjectionTarget().preDestroy(instance.bean);
- }
- interceptorStack.invoke();
-
- if (instance.creationalContext != null) {
- instance.creationalContext.release();
- }
- } catch (final Throwable re) {
- logger.error("The bean instance " + instance + " threw a system exception:" + re, re);
- }
-
- }
-
- protected void setDefault(final Duration duration, final TimeUnit unit) {
- if (duration.getUnit() == null) {
- duration.setUnit(unit);
- }
- }
-
- protected Duration getDuration(final Options options, final String property, final Duration defaultValue, final TimeUnit defaultUnit) {
- final String s = options.get(property, defaultValue.toString());
- final Duration duration = new Duration(s);
- if (duration.getUnit() == null) {
- duration.setUnit(defaultUnit);
- }
- return duration;
- }
-
- public void undeploy(final BeanContext beanContext) {
- final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData();
- if (data == null) {
- return;
- }
-
- final MBeanServer server = LocalMBeanServer.get();
- for (final ObjectName objectName : data.getJmxNames()) {
- try {
- server.unregisterMBean(objectName);
- } catch (final Exception e) {
- logger.error("Unable to unregister MBean " + objectName);
- }
- }
-
- try {
- if (!data.closePool()) {
- logger.error("Timed-out waiting for instance manager pool to close: for deployment '" + beanContext.getDeploymentID() + "'");
- }
-
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- beanContext.setContainerData(null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java
deleted file mode 100644
index a68b324..0000000
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.openejb.core.instance;
-
-import org.apache.openejb.core.BaseContext;
-import org.apache.openejb.core.mdb.Instance;
-import org.apache.openejb.util.Duration;
-import org.apache.openejb.util.Pool;
-
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-public class InstanceManagerData {
-
- private final Pool<Instance> pool;
- private final Duration accessTimeout;
- private final Duration closeTimeout;
- private final List<ObjectName> jmxNames = new ArrayList<ObjectName>();
- private BaseContext baseContext;
-
- public InstanceManagerData(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) {
- this.pool = pool;
- this.accessTimeout = accessTimeout;
- this.closeTimeout = closeTimeout;
- }
-
- public Duration getAccessTimeout() {
- return accessTimeout;
- }
-
- public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException {
- return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit());
- }
-
- public Pool<Instance> getPool() {
- return pool;
- }
- public void flush() {
- this.pool.flush();
- }
-
- public boolean closePool() throws InterruptedException {
- return pool.close(closeTimeout.getTime(), closeTimeout.getUnit());
- }
-
- public ObjectName add(final ObjectName name) {
- jmxNames.add(name);
- return name;
- }
-
- public List<ObjectName> getJmxNames() {
- return jmxNames;
- }
-
- public BaseContext getBaseContext() {
- return baseContext;
- }
-
- public void setBaseContext(BaseContext baseContext) {
- this.baseContext = baseContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java
index d1fcd6c..a74dd03 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java
@@ -17,23 +17,34 @@
package org.apache.openejb.core.mdb;
+import org.apache.openejb.ApplicationException;
import org.apache.openejb.BeanContext;
import org.apache.openejb.OpenEJBException;
-import org.apache.openejb.core.instance.InstanceCreatorRunnable;
-import org.apache.openejb.core.instance.InstanceManager;
-import org.apache.openejb.core.instance.InstanceManagerData;
+import org.apache.openejb.SystemException;
+import org.apache.openejb.cdi.CdiEjbBean;
+import org.apache.openejb.core.BaseContext;
+import org.apache.openejb.core.InstanceContext;
+import org.apache.openejb.core.Operation;
+import org.apache.openejb.core.ThreadContext;
+import org.apache.openejb.core.interceptor.InterceptorData;
+import org.apache.openejb.core.interceptor.InterceptorStack;
import org.apache.openejb.loader.Options;
import org.apache.openejb.monitoring.LocalMBeanServer;
import org.apache.openejb.monitoring.ManagedMBean;
import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.openejb.monitoring.StatsInterceptor;
import org.apache.openejb.spi.SecurityService;
+import org.apache.openejb.util.DaemonThreadFactory;
import org.apache.openejb.util.Duration;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
import org.apache.openejb.util.PassthroughFactory;
import org.apache.openejb.util.Pool;
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
+import javax.ejb.ConcurrentAccessTimeoutException;
+import javax.ejb.SessionBean;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
@@ -56,18 +67,47 @@ import javax.resource.spi.ActivationSpec;
import javax.resource.spi.ResourceAdapter;
import java.io.Flushable;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static javax.management.MBeanOperationInfo.ACTION;
-public class MdbInstanceManager extends InstanceManager {
+public class MdbInstanceManager {
+ protected static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
+ protected static final Method removeSessionBeanMethod;
+
+ static { // initialize it only once
+ Method foundRemoveMethod;
+ try {
+ foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove");
+ } catch (final NoSuchMethodException e) {
+ foundRemoveMethod = null;
+ }
+ removeSessionBeanMethod = foundRemoveMethod;
+ }
+
+ private final Duration accessTimeout;
+ private final Duration closeTimeout;
+ private final Pool.Builder poolBuilder;
+ private final ThreadPoolExecutor executor;
+ private final ScheduledExecutorService scheduledExecutor;
private final Map<BeanContext, MdbPoolContainer.MdbActivationContext> activationContexts = new ConcurrentHashMap<>();
private final Map<BeanContext, ObjectName> mbeanNames = new ConcurrentHashMap<>();
@@ -84,7 +124,43 @@ public class MdbInstanceManager extends InstanceManager {
final Duration accessTimeout, final Duration closeTimeout,
final Pool.Builder poolBuilder, final int callbackThreads,
final ScheduledExecutorService ses) {
- super(accessTimeout, closeTimeout, poolBuilder, callbackThreads, ses);
+ this.accessTimeout = accessTimeout;
+ this.closeTimeout = closeTimeout;
+ this.poolBuilder = poolBuilder;
+ this.scheduledExecutor = ses;
+
+ if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) {
+ ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true);
+ }
+
+ if (accessTimeout.getUnit() == null) {
+ accessTimeout.setUnit(TimeUnit.MILLISECONDS);
+ }
+
+ final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1;
+ final ThreadFactory threadFactory = new DaemonThreadFactory("InstanceManagerPool.worker.");
+ this.executor = new ThreadPoolExecutor(
+ callbackThreads, callbackThreads * 2,
+ 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory);
+
+ this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+ logger.warning("Executor failed to run asynchronous process: " + r);
+ }
+ } catch (final InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
+
this.securityService = securityService;
this.resourceAdapter = resourceAdapter;
this.inboundRecovery = inboundRecovery;
@@ -93,7 +169,7 @@ public class MdbInstanceManager extends InstanceManager {
public void deploy(final BeanContext beanContext, final ActivationSpec activationSpec, final EndpointFactory endpointFactory)
- throws OpenEJBException{
+ throws OpenEJBException {
if (inboundRecovery != null) {
inboundRecovery.recover(resourceAdapter, activationSpec, containerID.toString());
}
@@ -118,7 +194,7 @@ public class MdbInstanceManager extends InstanceManager {
final long maxAge = builder.getMaxAge().getTime(TimeUnit.MILLISECONDS);
final double maxAgeOffset = builder.getMaxAgeOffset();
- final InstanceManagerData data = new InstanceManagerData(builder.build(), accessTimeout, closeTimeout);
+ final Data data = new Data(builder.build(), accessTimeout, closeTimeout);
MdbContext mdbContext = new MdbContext(securityService, new Flushable() {
@Override
@@ -182,7 +258,7 @@ public class MdbInstanceManager extends InstanceManager {
String jmxName = beanContext.getActivationProperties().get("MdbJMXControl");
if (jmxName == null) {
- jmxName = "true";
+ jmxName = "true";
}
addJMxControl(beanContext, jmxName, activationContext);
@@ -209,7 +285,7 @@ public class MdbInstanceManager extends InstanceManager {
data.getPool().start();
}
- public void undeploy(final BeanContext beanContext){
+ public void undeploy(final BeanContext beanContext) {
final MdbPoolContainer.MdbActivationContext actContext = activationContexts.get(beanContext);
if (actContext == null) {
return;
@@ -335,4 +411,282 @@ public class MdbInstanceManager extends InstanceManager {
return ATTRIBUTE_LIST;
}
}
+
+ private final class InstanceSupplier implements Pool.Supplier<Instance> {
+ private final BeanContext beanContext;
+
+ public InstanceSupplier(final BeanContext beanContext) {
+ this.beanContext = beanContext;
+ }
+
+ @Override
+ public void discard(final Instance instance, final Pool.Event reason) {
+
+ final ThreadContext ctx = new ThreadContext(beanContext, null);
+ final ThreadContext oldCallContext = ThreadContext.enter(ctx);
+ try {
+ freeInstance(ctx, instance);
+ } finally {
+ ThreadContext.exit(oldCallContext);
+ }
+ }
+
+ @Override
+ public Instance create() {
+ final ThreadContext ctx = new ThreadContext(beanContext, null);
+ final ThreadContext oldCallContext = ThreadContext.enter(ctx);
+ try {
+ return createInstance(ctx.getBeanContext());
+ } catch (final OpenEJBException e) {
+ logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e);
+ } finally {
+ ThreadContext.exit(oldCallContext);
+ }
+ return null;
+ }
+ }
+
+ public void destroy() {
+ if (executor != null) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(10000, MILLISECONDS)) {
+ java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ try {
+ if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) {
+ java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Removes an instance from the pool and returns it for use
+ * by the container in business methods.
+ * <p/>
+ * If the pool is at it's limit the StrictPooling flag will
+ * cause this thread to wait.
+ * <p/>
+ * If StrictPooling is not enabled this method will create a
+ * new bean instance performing all required injection
+ * and callbacks before returning it in a method ready state.
+ *
+ * @param callContext ThreadContext
+ * @return Object
+ * @throws OpenEJBException
+ */
+ public Instance getInstance(final ThreadContext callContext) throws OpenEJBException {
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+
+ Instance instance = null;
+ try {
+ final Pool<Instance>.Entry entry = data.poolPop();
+
+ if (entry != null) {
+ instance = entry.get();
+ instance.setPoolEntry(entry);
+ }
+ } catch (final TimeoutException e) {
+ final String msg = "No instances available in Session Bean pool. Waited " + data.getAccessTimeout().toString();
+ final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg);
+ timeoutException.fillInStackTrace();
+ throw new ApplicationException(timeoutException);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new OpenEJBException("Unexpected Interruption of current thread: ", e);
+ }
+
+ if (null == instance) {
+ instance = createInstance(beanContext);
+ }
+
+ return instance;
+ }
+
+ private Instance createInstance(final BeanContext beanContext) throws ApplicationException {
+ try {
+ final InstanceContext context = beanContext.newInstance();
+ return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext());
+
+ } catch (Throwable e) {
+ if (e instanceof InvocationTargetException) {
+ e = ((InvocationTargetException) e).getTargetException();
+ }
+ final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e;
+ logger.error(t, e);
+ throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e));
+ }
+ }
+
+ /**
+ * All instances are removed from the pool in getInstance(...). They are only
+ * returned by the Container via this method under two circumstances.
+ * <p/>
+ * 1. The business method returns normally
+ * 2. The business method throws an application exception
+ * <p/>
+ * Instances are not returned to the pool if the business method threw a system
+ * exception.
+ *
+ * @param callContext ThreadContext
+ * @param bean Object
+ * @throws OpenEJBException
+ */
+ public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException {
+
+ if (bean == null) {
+ throw new SystemException("Invalid arguments");
+ }
+
+ final Instance instance = Instance.class.cast(bean);
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+ final Pool<Instance> pool = data.getPool();
+
+ if (instance.getPoolEntry() != null) {
+ pool.push(instance.getPoolEntry());
+ } else {
+ pool.push(instance);
+ }
+ }
+
+ /**
+ * This method is called to release the semaphore in case of the business method
+ * throwing a system exception
+ *
+ * @param callContext ThreadContext
+ * @param bean Object
+ */
+ public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException {
+
+ if (bean == null) {
+ throw new SystemException("Invalid arguments");
+ }
+
+ final Instance instance = Instance.class.cast(bean);
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+
+ if (null != data) {
+ final Pool<Instance> pool = data.getPool();
+ pool.discard(instance.getPoolEntry());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void freeInstance(final ThreadContext callContext, final Instance instance) {
+ try {
+ callContext.setCurrentOperation(Operation.PRE_DESTROY);
+ final BeanContext beanContext = callContext.getBeanContext();
+
+ final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null;
+
+ final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors();
+ final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors);
+
+ final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class);
+ if (bean != null) { // TODO: see if it should be called before or after next call
+ bean.getInjectionTarget().preDestroy(instance.bean);
+ }
+ interceptorStack.invoke();
+
+ if (instance.creationalContext != null) {
+ instance.creationalContext.release();
+ }
+ } catch (final Throwable re) {
+ logger.error("The bean instance " + instance + " threw a system exception:" + re, re);
+ }
+
+ }
+
+ private void setDefault(final Duration duration, final TimeUnit unit) {
+ if (duration.getUnit() == null) {
+ duration.setUnit(unit);
+ }
+ }
+
+ private final class InstanceCreatorRunnable implements Runnable {
+
+ private final Data data;
+ private final InstanceSupplier supplier;
+ private final long offset;
+
+ public InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset,
+ final Data data, final InstanceSupplier supplier) {
+ this.data = data;
+ this.supplier = supplier;
+ this.offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l;
+ }
+
+ @Override
+ public void run() {
+ final Instance obj = supplier.create();
+ if (obj != null) {
+ data.getPool().add(obj, offset);
+ }
+ }
+ }
+
+ private class Data {
+
+ private final Pool<Instance> pool;
+ private final Duration accessTimeout;
+ private final Duration closeTimeout;
+ private final List<ObjectName> jmxNames = new ArrayList<ObjectName>();
+ private BaseContext baseContext;
+
+ public Data(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) {
+ this.pool = pool;
+ this.accessTimeout = accessTimeout;
+ this.closeTimeout = closeTimeout;
+ }
+
+ public Duration getAccessTimeout() {
+ return accessTimeout;
+ }
+
+ public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException {
+ return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit());
+ }
+
+ public Pool<Instance> getPool() {
+ return pool;
+ }
+
+ public void flush() {
+ this.pool.flush();
+ }
+
+ public boolean closePool() throws InterruptedException {
+ return pool.close(closeTimeout.getTime(), closeTimeout.getUnit());
+ }
+
+ public ObjectName add(final ObjectName name) {
+ jmxNames.add(name);
+ return name;
+ }
+
+ public List<ObjectName> getJmxNames() {
+ return jmxNames;
+ }
+
+ public BaseContext getBaseContext() {
+ return baseContext;
+ }
+
+ public void setBaseContext(BaseContext baseContext) {
+ this.baseContext = baseContext;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java
new file mode 100644
index 0000000..17bbf20
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openejb.core.stateless;
+
+import org.apache.openejb.util.Pool;
+
+import javax.enterprise.context.spi.CreationalContext;
+import java.util.Map;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Instance {
+ public final Object bean;
+ public final Map<String, Object> interceptors;
+ public final CreationalContext creationalContext;
+
+ private Pool<Instance>.Entry poolEntry;
+
+ public Instance(final Object bean, final Map<String, Object> interceptors, final CreationalContext creationalContext) {
+ this.bean = bean;
+ this.interceptors = interceptors;
+ this.creationalContext = creationalContext;
+ }
+
+ public Pool<Instance>.Entry getPoolEntry() {
+ return poolEntry;
+ }
+
+ public void setPoolEntry(final Pool<Instance>.Entry poolEntry) {
+ this.poolEntry = poolEntry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
index bc549ea..ea44ea4 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
@@ -30,7 +30,6 @@ import org.apache.openejb.core.Operation;
import org.apache.openejb.core.ThreadContext;
import org.apache.openejb.core.interceptor.InterceptorData;
import org.apache.openejb.core.interceptor.InterceptorStack;
-import org.apache.openejb.core.mdb.Instance;
import org.apache.openejb.core.security.AbstractSecurityService;
import org.apache.openejb.core.timer.EjbTimerService;
import org.apache.openejb.core.transaction.TransactionPolicy;
http://git-wip-us.apache.org/repos/asf/tomee/blob/34a159fb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
index e73d8ca..aaa91d3 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
@@ -17,12 +17,17 @@
package org.apache.openejb.core.stateless;
+import org.apache.openejb.ApplicationException;
import org.apache.openejb.BeanContext;
import org.apache.openejb.OpenEJBException;
-import org.apache.openejb.core.instance.InstanceCreatorRunnable;
-import org.apache.openejb.core.instance.InstanceManager;
-import org.apache.openejb.core.instance.InstanceManagerData;
+import org.apache.openejb.SystemException;
+import org.apache.openejb.cdi.CdiEjbBean;
+import org.apache.openejb.core.InstanceContext;
+import org.apache.openejb.core.Operation;
+import org.apache.openejb.core.ThreadContext;
+import org.apache.openejb.core.interceptor.InterceptorData;
import org.apache.openejb.core.interceptor.InterceptorInstance;
+import org.apache.openejb.core.interceptor.InterceptorStack;
import org.apache.openejb.core.timer.TimerServiceWrapper;
import org.apache.openejb.loader.Options;
import org.apache.openejb.monitoring.LocalMBeanServer;
@@ -30,46 +35,313 @@ import org.apache.openejb.monitoring.ManagedMBean;
import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.openejb.monitoring.StatsInterceptor;
import org.apache.openejb.spi.SecurityService;
+import org.apache.openejb.util.DaemonThreadFactory;
import org.apache.openejb.util.Duration;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
import org.apache.openejb.util.PassthroughFactory;
import org.apache.openejb.util.Pool;
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
+import javax.ejb.ConcurrentAccessTimeoutException;
import javax.ejb.EJBContext;
+import javax.ejb.SessionBean;
+import javax.ejb.SessionContext;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.naming.Context;
import javax.naming.NamingException;
import java.io.Flushable;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
-public class StatelessInstanceManager extends InstanceManager {
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+public class StatelessInstanceManager {
+ private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
+ private static final Method removeSessionBeanMethod;
+
+ static { // initialize it only once
+ Method foundRemoveMethod;
+ try {
+ foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove");
+ } catch (final NoSuchMethodException e) {
+ foundRemoveMethod = null;
+ }
+ removeSessionBeanMethod = foundRemoveMethod;
+ }
+
+ private final Duration accessTimeout;
+ private final Duration closeTimeout;
private final SecurityService securityService;
+ private final Pool.Builder poolBuilder;
+ private final ThreadPoolExecutor executor;
+ private final ScheduledExecutorService scheduledExecutor;
public StatelessInstanceManager(final SecurityService securityService,
final Duration accessTimeout, final Duration closeTimeout,
final Pool.Builder poolBuilder, final int callbackThreads,
final ScheduledExecutorService ses) {
- super(accessTimeout, closeTimeout, poolBuilder, callbackThreads, ses);
this.securityService = securityService;
+ this.accessTimeout = accessTimeout;
+ this.closeTimeout = closeTimeout;
+ this.poolBuilder = poolBuilder;
+ this.scheduledExecutor = ses;
+ if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) {
+ ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true);
+ }
+
+ if (accessTimeout.getUnit() == null) {
+ accessTimeout.setUnit(TimeUnit.MILLISECONDS);
+ }
+
+ final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1;
+ final ThreadFactory threadFactory = new DaemonThreadFactory("StatelessPool.worker.");
+ this.executor = new ThreadPoolExecutor(
+ callbackThreads, callbackThreads * 2,
+ 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory);
+
+ this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+ logger.warning("Executor failed to run asynchronous process: " + r);
+ }
+ } catch (final InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
}
+ private final class StatelessSupplier implements Pool.Supplier<Instance> {
+ private final BeanContext beanContext;
+
+ private StatelessSupplier(final BeanContext beanContext) {
+ this.beanContext = beanContext;
+ }
+
+ @Override
+ public void discard(final Instance instance, final Pool.Event reason) {
+
+ final ThreadContext ctx = new ThreadContext(beanContext, null);
+ final ThreadContext oldCallContext = ThreadContext.enter(ctx);
+ try {
+ freeInstance(ctx, instance);
+ } finally {
+ ThreadContext.exit(oldCallContext);
+ }
+ }
+
+ @Override
+ public Instance create() {
+ final ThreadContext ctx = new ThreadContext(beanContext, null);
+ final ThreadContext oldCallContext = ThreadContext.enter(ctx);
+ try {
+ return createInstance(ctx, ctx.getBeanContext());
+ } catch (final OpenEJBException e) {
+ logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e);
+ } finally {
+ ThreadContext.exit(oldCallContext);
+ }
+ return null;
+ }
+ }
+
+ public void destroy() {
+ if (executor != null) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(10000, MILLISECONDS)) {
+ java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
+ }
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ }
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ try {
+ if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) {
+ java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired");
+ }
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ }
+ }
+ }
+
+ /**
+ * Removes an instance from the pool and returns it for use
+ * by the container in business methods.
+ * <p/>
+ * If the pool is at it's limit the StrictPooling flag will
+ * cause this thread to wait.
+ * <p/>
+ * If StrictPooling is not enabled this method will create a
+ * new stateless bean instance performing all required injection
+ * and callbacks before returning it in a method ready state.
+ *
+ * @param callContext ThreadContext
+ * @return Object
+ * @throws OpenEJBException
+ */
+ public Instance getInstance(final ThreadContext callContext) throws OpenEJBException {
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+
+ Instance instance = null;
+ try {
+ final Pool<Instance>.Entry entry = data.poolPop();
+
+ if (entry != null) {
+ instance = entry.get();
+ instance.setPoolEntry(entry);
+ }
+ } catch (final TimeoutException e) {
+ final String msg = "No instances available in Stateless Session Bean pool. Waited " + data.accessTimeout.toString();
+ final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg);
+ timeoutException.fillInStackTrace();
+ throw new ApplicationException(timeoutException);
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ throw new OpenEJBException("Unexpected Interruption of current thread: ", e);
+ }
+
+ if (null == instance) {
+ instance = createInstance(callContext, beanContext);
+ }
+
+ return instance;
+ }
+
+ private Instance createInstance(final ThreadContext callContext, final BeanContext beanContext) throws ApplicationException {
+ try {
+ final InstanceContext context = beanContext.newInstance();
+ return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext());
+
+ } catch (Throwable e) {
+ if (e instanceof InvocationTargetException) {
+ e = ((InvocationTargetException) e).getTargetException();
+ }
+ final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e;
+ logger.error(t, e);
+ throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e));
+ }
+ }
+
+ /**
+ * All instances are removed from the pool in getInstance(...). They are only
+ * returned by the StatelessContainer via this method under two circumstances.
+ * <p/>
+ * 1. The business method returns normally
+ * 2. The business method throws an application exception
+ * <p/>
+ * Instances are not returned to the pool if the business method threw a system
+ * exception.
+ *
+ * @param callContext ThreadContext
+ * @param bean Object
+ * @throws OpenEJBException
+ */
+ public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException {
+
+ if (bean == null) {
+ throw new SystemException("Invalid arguments");
+ }
+
+ final Instance instance = Instance.class.cast(bean);
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+ final Pool<Instance> pool = data.getPool();
+
+ if (instance.getPoolEntry() != null) {
+ pool.push(instance.getPoolEntry());
+ } else {
+ pool.push(instance);
+ }
+ }
+
+ /**
+ * This method is called to release the semaphore in case of the business method
+ * throwing a system exception
+ *
+ * @param callContext ThreadContext
+ * @param bean Object
+ */
+ public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException {
+
+ if (bean == null) {
+ throw new SystemException("Invalid arguments");
+ }
+
+ final Instance instance = Instance.class.cast(bean);
+ final BeanContext beanContext = callContext.getBeanContext();
+ final Data data = (Data) beanContext.getContainerData();
+
+ if (null != data) {
+ final Pool<Instance> pool = data.getPool();
+ pool.discard(instance.getPoolEntry());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void freeInstance(final ThreadContext callContext, final Instance instance) {
+ try {
+ callContext.setCurrentOperation(Operation.PRE_DESTROY);
+ final BeanContext beanContext = callContext.getBeanContext();
+
+ final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null;
+
+ final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors();
+ final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors);
+
+ final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class);
+ if (bean != null) { // TODO: see if it should be called before or after next call
+ bean.getInjectionTarget().preDestroy(instance.bean);
+ }
+ interceptorStack.invoke();
+
+ if (instance.creationalContext != null) {
+ instance.creationalContext.release();
+ }
+ } catch (final Throwable re) {
+ logger.error("The bean instance " + instance + " threw a system exception:" + re, re);
+ }
+
+ }
@SuppressWarnings("unchecked")
public void deploy(final BeanContext beanContext) throws OpenEJBException {
final Options options = new Options(beanContext.getProperties());
final Duration accessTimeout = getDuration(
- options,
- "AccessTimeout",
- getDuration(options, "Timeout", this.accessTimeout, TimeUnit.MILLISECONDS), // default timeout
- TimeUnit.MILLISECONDS
+ options,
+ "AccessTimeout",
+ getDuration(options, "Timeout", this.accessTimeout, TimeUnit.MILLISECONDS), // default timeout
+ TimeUnit.MILLISECONDS
);
final Duration closeTimeout = getDuration(options, "CloseTimeout", this.closeTimeout, TimeUnit.MINUTES);
@@ -84,30 +356,20 @@ public class StatelessInstanceManager extends InstanceManager {
setDefault(builder.getIdleTimeout(), TimeUnit.MINUTES);
setDefault(builder.getInterval(), TimeUnit.MINUTES);
- final InstanceSupplier supplier = new InstanceSupplier(beanContext);
+ final StatelessSupplier supplier = new StatelessSupplier(beanContext);
builder.setSupplier(supplier);
builder.setExecutor(executor);
builder.setScheduledExecutor(scheduledExecutor);
-
- final InstanceManagerData data = new InstanceManagerData(builder.build(), accessTimeout, closeTimeout);
-
- StatelessContext statelessContext = new StatelessContext(securityService, new Flushable() {
- @Override
- public void flush() throws IOException {
- data.flush();
- }
- });
- data.setBaseContext(statelessContext);
-
+ final Data data = new Data(builder.build(), accessTimeout, closeTimeout);
beanContext.setContainerData(data);
- beanContext.set(EJBContext.class, data.getBaseContext());
+ beanContext.set(EJBContext.class, data.sessionContext);
try {
final Context context = beanContext.getJndiEnc();
- context.bind("comp/EJBContext", data.getBaseContext());
- context.bind("comp/WebServiceContext", new EjbWsContext(statelessContext));
+ context.bind("comp/EJBContext", data.sessionContext);
+ context.bind("comp/WebServiceContext", new EjbWsContext(data.sessionContext));
context.bind("comp/TimerService", new TimerServiceWrapper());
} catch (final NamingException e) {
throw new OpenEJBException("Failed to bind EJBContext/WebServiceContext/TimerService", e);
@@ -158,7 +420,7 @@ public class StatelessInstanceManager extends InstanceManager {
if (server.isRegistered(objectName)) {
server.unregisterMBean(objectName);
}
- server.registerMBean(new ManagedMBean(data.getPool()), objectName);
+ server.registerMBean(new ManagedMBean(data.pool), objectName);
data.add(objectName);
} catch (final Exception e) {
logger.error("Unable to register MBean ", e);
@@ -180,4 +442,114 @@ public class StatelessInstanceManager extends InstanceManager {
data.getPool().start();
}
+
+ private void setDefault(final Duration duration, final TimeUnit unit) {
+ if (duration.getUnit() == null) {
+ duration.setUnit(unit);
+ }
+ }
+
+ private Duration getDuration(final Options options, final String property, final Duration defaultValue, final TimeUnit defaultUnit) {
+ final String s = options.get(property, defaultValue.toString());
+ final Duration duration = new Duration(s);
+ if (duration.getUnit() == null) {
+ duration.setUnit(defaultUnit);
+ }
+ return duration;
+ }
+
+ public void undeploy(final BeanContext beanContext) {
+ final Data data = (Data) beanContext.getContainerData();
+ if (data == null) {
+ return;
+ }
+
+ final MBeanServer server = LocalMBeanServer.get();
+ for (final ObjectName objectName : data.jmxNames) {
+ try {
+ server.unregisterMBean(objectName);
+ } catch (final Exception e) {
+ logger.error("Unable to unregister MBean " + objectName);
+ }
+ }
+
+ try {
+ if (!data.closePool()) {
+ logger.error("Timed-out waiting for stateless pool to close: for deployment '" + beanContext.getDeploymentID() + "'");
+ }
+
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ }
+
+ beanContext.setContainerData(null);
+ }
+
+ private final class Data {
+ private final Pool<Instance> pool;
+ private final Duration accessTimeout;
+ private final Duration closeTimeout;
+ private final List<ObjectName> jmxNames = new ArrayList<ObjectName>();
+ private final SessionContext sessionContext;
+
+ private Data(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) {
+ this.pool = pool;
+ this.accessTimeout = accessTimeout;
+ this.closeTimeout = closeTimeout;
+ this.sessionContext = new StatelessContext(securityService, new Flushable() {
+ @Override
+ public void flush() throws IOException {
+ getPool().flush();
+ }
+ });
+ }
+
+ public Duration getAccessTimeout() {
+ return accessTimeout;
+ }
+
+ public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException {
+ return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit());
+ }
+
+ public Pool<Instance> getPool() {
+ return pool;
+ }
+
+ public boolean closePool() throws InterruptedException {
+ return pool.close(closeTimeout.getTime(), closeTimeout.getUnit());
+ }
+
+ public ObjectName add(final ObjectName name) {
+ jmxNames.add(name);
+ return name;
+ }
+ }
+
+ private final class InstanceCreatorRunnable implements Runnable {
+ private final long maxAge;
+ private final long iteration;
+ private final double maxAgeOffset;
+ private final long min;
+ private final Data data;
+ private final StatelessSupplier supplier;
+
+ private InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset, final Data data, final StatelessSupplier supplier) {
+ this.maxAge = maxAge;
+ this.iteration = iteration;
+ this.min = min;
+ this.maxAgeOffset = maxAgeOffset;
+ this.data = data;
+ this.supplier = supplier;
+ }
+
+ @Override
+ public void run() {
+ final Instance obj = supplier.create();
+ if (obj != null) {
+ final long offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l;
+ data.getPool().add(obj, offset);
+ }
+ }
+ }
}