You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2017/03/03 20:11:36 UTC

[2/2] tomee git commit: Backport pool start/stop

Backport pool start/stop


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/39b42eb6
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/39b42eb6
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/39b42eb6

Branch: refs/heads/tomee-1.7.x
Commit: 39b42eb633d2d59a484c7ea78200698c3c6d56e5
Parents: ffea29d
Author: AndyGee <an...@gmx.de>
Authored: Fri Mar 3 21:11:07 2017 +0100
Committer: AndyGee <an...@gmx.de>
Committed: Fri Mar 3 21:11:07 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/openejb/util/Pool.java | 51 ++++++++++++++------
 .../java/org/apache/openejb/util/PoolTest.java  |  1 +
 2 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/39b42eb6/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
index 1e6895e..f87fd52 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -43,6 +44,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * Any successful pop() call requires a corresponding push() or discard() call.
@@ -74,6 +76,7 @@ public class Pool<T> {
 
     private final Supplier<T> supplier;
     private final AtomicReference<ScheduledExecutorService> scheduler = new AtomicReference<ScheduledExecutorService>();
+    private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<ScheduledFuture<?>>();
     private final Sweeper sweeper;
 
     private final CountingLatch out = new CountingLatch();
@@ -127,21 +130,41 @@ public class Pool<T> {
     }
 
     public Pool start() {
-        final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory());
-
-        if (this.scheduler.compareAndSet(null, scheduledExecutorService)) {
-            this.scheduler.get().scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS);
+        ScheduledExecutorService scheduledExecutorService = this.scheduler.get();
+        boolean createdSES = scheduledExecutorService == null;
+        if (scheduledExecutorService == null) {
+            scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory());
+            if (!this.scheduler.compareAndSet(null, scheduledExecutorService)) {
+                scheduledExecutorService.shutdownNow();
+                scheduledExecutorService = this.scheduler.get();
+                createdSES = false;
+            }
+        }
+        final ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS);
+        if (!this.future.compareAndSet(null, scheduledFuture)) {
+            scheduledFuture.cancel(true);
+        }
+        if (!createdSES) {
+            // we don't want to shutdown it, we'll just stop the task
+            this.scheduler.set(null);
         }
         return this;
     }
 
     public void stop() {
-        final ScheduledExecutorService scheduler = this.scheduler.get();
-        if (scheduler != null && this.scheduler.compareAndSet(scheduler, null)) {
+        final ScheduledFuture<?> future = this.future.getAndSet(null);
+        if (future != null
+                && !future.isDone() && !future.isCancelled()
+                && !future.cancel(false)) {
+            Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler task termination timeout expired");
+        }
+
+        final ScheduledExecutorService scheduler = this.scheduler.getAndSet(null);
+        if (scheduler != null) {
             scheduler.shutdown();
             try {
-                if (!scheduler.awaitTermination(10000, MILLISECONDS)) {
-                    Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Pool scheduler termination timeout expired");
+                if (!scheduler.awaitTermination(10, SECONDS)) { // should last something like 0s max since we killed the task
+                    Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler termination timeout expired");
                 }
             } catch (final InterruptedException e) {
                 //Ignore
@@ -150,7 +173,7 @@ public class Pool<T> {
     }
 
     public boolean running() {
-        return this.scheduler.get() != null;
+        return this.future.get() != null;
     }
 
     private Executor createExecutor() {
@@ -444,6 +467,9 @@ public class Pool<T> {
 
     public boolean close(final long timeout, final TimeUnit unit) throws InterruptedException {
 
+        // Stop the sweeper thread
+        stop();
+
         // drain all keys so no new instances will be accepted into the pool
         while (instances.tryAcquire()) {
             Thread.yield();
@@ -461,9 +487,6 @@ public class Pool<T> {
             //Ignore
         }
 
-        // Stop the sweeper thread
-        stop();
-
         // Drain all leases
         if (!(available instanceof Overdraft)) {
             while (available.tryAcquire()) {
@@ -659,7 +682,7 @@ public class Pool<T> {
                 while (true) {
                     final Entry entry = pop(0, MILLISECONDS, false);
                     if (entry == null) {
-                        push(entry, true);
+                        push(null, true);
                         break;
                     }
                     entries.add(entry);
@@ -771,7 +794,7 @@ public class Pool<T> {
             }
 
             for (int i = 0; i < replace.size(); i++) {
-                final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0l;
+                final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0L;
                 executor.execute(new Replace(replace.get(i).entry, offset));
             }
         }

http://git-wip-us.apache.org/repos/asf/tomee/blob/39b42eb6/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java b/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
index 6dc08cd..fbde94f 100644
--- a/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
+++ b/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
@@ -321,6 +321,7 @@ public class PoolTest extends TestCase {
 
         final long start = System.currentTimeMillis();
         assertTrue(pool.close(10, TimeUnit.SECONDS));
+        assertFalse(pool.running());
         final long time = System.currentTimeMillis() - start;
 
         // All instances should have been removed