You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2014/07/23 19:08:37 UTC
svn commit: r1612877 -
/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
Author: rmannibucau
Date: Wed Jul 23 17:08:37 2014
New Revision: 1612877
URL: http://svn.apache.org/r1612877
Log:
TOMEE-1276 TimerExecutor shouldn't kill the executor each time
Modified:
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
Modified: tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java?rev=1612877&r1=1612876&r2=1612877&view=diff
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java (original)
+++ tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java Wed Jul 23 17:08:37 2014
@@ -24,10 +24,13 @@ import org.apache.openejb.util.ExecutorB
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @version $Rev$ $Date$
@@ -68,6 +71,7 @@ public class DefaultTimerThreadPoolAdapt
// that specifically and have it explicitly created somewhere
public static final class TimerExecutor {
private final Executor executor;
+ private final AtomicInteger references = new AtomicInteger(0);
private TimerExecutor(final Executor executor) {
if (executor == null) {
@@ -75,6 +79,15 @@ public class DefaultTimerThreadPoolAdapt
}
this.executor = executor;
}
+
+ public TimerExecutor incr() {
+ references.incrementAndGet();
+ return this;
+ }
+
+ public boolean decr() {
+ return references.decrementAndGet() == 0;
+ }
}
@Override
@@ -128,14 +141,15 @@ public class DefaultTimerThreadPoolAdapt
final TimerExecutor timerExecutor = SystemInstance.get().getComponent(TimerExecutor.class);
if (timerExecutor != null) {
- this.executor = timerExecutor.executor;
+ this.executor = timerExecutor.incr().executor;
} else {
this.executor = new ExecutorBuilder()
.size(threadCount)
.prefix("EjbTimerPool")
.build(SystemInstance.get().getOptions());
- SystemInstance.get().setComponent(TimerExecutor.class, new TimerExecutor(this.executor));
+ final TimerExecutor value = new TimerExecutor(this.executor).incr();
+ SystemInstance.get().setComponent(TimerExecutor.class, value);
}
this.threadPoolExecutorUsed = this.executor instanceof ThreadPoolExecutor;
@@ -157,19 +171,48 @@ public class DefaultTimerThreadPoolAdapt
}
@Override
- public void shutdown(final boolean arg0) {
+ public synchronized void shutdown(final boolean waitForJobsToComplete) {
if (threadPoolExecutorUsed) {
- final ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
- tpe.shutdown();
- if (arg0) {
- final int timeout = SystemInstance.get().getOptions().get(OPENEJB_EJB_TIMER_POOL_AWAIT_SECONDS, 5);
- try {
- tpe.awaitTermination(timeout, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- logger.error(e.getMessage(), e);
+ final SystemInstance systemInstance = SystemInstance.get();
+ final TimerExecutor te = systemInstance.getComponent(TimerExecutor.class);
+ if (te != null) {
+ if (te.executor == executor) {
+ if (te.decr()) {
+ doShutdownExecutor(waitForJobsToComplete);
+ systemInstance.removeComponent(TimerExecutor.class);
+ } else { // flush jobs, maybe not all dedicated to this threadpool if shared but shouldn't be an issue
+ final ThreadPoolExecutor tpe = ThreadPoolExecutor.class.cast(executor);
+ if (waitForJobsToComplete) {
+ final Collection<Runnable> jobs = new ArrayList<>();
+ tpe.getQueue().drainTo(jobs);
+ for (final Runnable r : jobs) {
+ try {
+ r.run();
+ } catch (final Exception e) {
+ logger.warning(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ } else {
+ doShutdownExecutor(waitForJobsToComplete);
}
+ } else {
+ doShutdownExecutor(waitForJobsToComplete);
+ }
+ }
+ }
+
+ private void doShutdownExecutor(final boolean waitJobs) {
+ final ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+ tpe.shutdown();
+ if (waitJobs) {
+ final int timeout = SystemInstance.get().getOptions().get(OPENEJB_EJB_TIMER_POOL_AWAIT_SECONDS, 5);
+ try {
+ tpe.awaitTermination(timeout, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ logger.error(e.getMessage(), e);
}
- SystemInstance.get().removeComponent(TimerExecutor.class);
}
}