You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/05/03 13:35:04 UTC

[httpcomponents-core] 02/02: HTTPCORE-631: Revised i/o reactor shutdown sequence and resource de-allocation

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch HTTPCORE-631
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 31deceb09644cbe369e40a0da4518ea6ea75e83c
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun May 3 15:28:46 2020 +0200

    HTTPCORE-631: Revised i/o reactor shutdown sequence and resource de-allocation
---
 .../core5/reactor/AbstractSingleCoreIOReactor.java | 51 +++++++++++-----------
 .../hc/core5/reactor/MultiCoreIOReactor.java       | 12 ++---
 2 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
index 86f32cc..af4584b 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
@@ -33,6 +33,7 @@ import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.function.Callback;
@@ -45,6 +46,7 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor {
 
     private final Callback<Exception> exceptionCallback;
     private final AtomicReference<IOReactorStatus> status;
+    private final AtomicBoolean terminated;
     private final Object shutdownMutex;
 
     final Selector selector;
@@ -54,6 +56,7 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor {
         this.exceptionCallback = exceptionCallback;
         this.shutdownMutex = new Object();
         this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
+        this.terminated = new AtomicBoolean();
         try {
             this.selector = Selector.open();
         } catch (final IOException ex) {
@@ -87,27 +90,10 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor {
             } finally {
                 try {
                     doTerminate();
-                    final Set<SelectionKey> keys = this.selector.keys();
-                    for (final SelectionKey key : keys) {
-                        try {
-                            Closer.close((Closeable) key.attachment());
-                        } catch (final IOException ex) {
-                            logException(ex);
-                        }
-                        key.channel().close();
-                    }
-                    try {
-                        this.selector.close();
-                    } catch (final IOException ex) {
-                        logException(ex);
-                    }
                 } catch (final Exception ex) {
                     logException(ex);
                 } finally {
-                    this.status.set(IOReactorStatus.SHUT_DOWN);
-                    synchronized (this.shutdownMutex) {
-                        this.shutdownMutex.notifyAll();
-                    }
+                    close(CloseMode.IMMEDIATE);
                 }
             }
         }
@@ -142,6 +128,9 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor {
 
     @Override
     public final void close(final CloseMode closeMode) {
+        if (this.status.get() == IOReactorStatus.SHUT_DOWN) {
+            return;
+        }
         if (closeMode == CloseMode.GRACEFUL) {
             initiateShutdown();
             try {
@@ -149,15 +138,27 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor {
             } catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
-        } else {
-            final IOReactorStatus previousStatus = this.status.getAndSet(IOReactorStatus.SHUT_DOWN);
-            if (previousStatus == IOReactorStatus.ACTIVE) {
-                this.selector.wakeup();
-            }
-            synchronized (this.shutdownMutex) {
-                this.shutdownMutex.notifyAll();
+        }
+        this.status.set(IOReactorStatus.SHUT_DOWN);
+        if (terminated.compareAndSet(false, true)) {
+            try {
+                final Set<SelectionKey> keys = this.selector.keys();
+                for (final SelectionKey key : keys) {
+                    try {
+                        Closer.close((Closeable) key.attachment());
+                    } catch (final IOException ex) {
+                        logException(ex);
+                    }
+                    key.channel().close();
+                }
+                selector.close();
+            } catch (final Exception ex) {
+                logException(ex);
             }
         }
+        synchronized (this.shutdownMutex) {
+            this.shutdownMutex.notifyAll();
+        }
     }
 
     @Override
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
index d41d1f3..c7caf51 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
@@ -28,6 +28,7 @@
 package org.apache.hc.core5.reactor;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.io.CloseMode;
@@ -40,12 +41,14 @@ class MultiCoreIOReactor implements IOReactor {
     private final IOReactor[] ioReactors;
     private final Thread[] threads;
     private final AtomicReference<IOReactorStatus> status;
+    private final AtomicBoolean terminated;
 
     MultiCoreIOReactor(final IOReactor[] ioReactors, final Thread[] threads) {
         super();
         this.ioReactors = ioReactors.clone();
         this.threads = threads.clone();
         this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
+        this.terminated = new AtomicBoolean();
     }
 
     @Override
@@ -105,8 +108,7 @@ class MultiCoreIOReactor implements IOReactor {
 
     @Override
     public final void close(final CloseMode closeMode) {
-        final IOReactorStatus currentStatus = this.status.get();
-        if (currentStatus == IOReactorStatus.INACTIVE || currentStatus == IOReactorStatus.SHUT_DOWN) {
+        if (this.status.get() == IOReactorStatus.SHUT_DOWN) {
             return;
         }
         if (closeMode == CloseMode.GRACEFUL) {
@@ -116,8 +118,9 @@ class MultiCoreIOReactor implements IOReactor {
             } catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
-        } else {
-            this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN);
+        }
+        this.status.set(IOReactorStatus.SHUT_DOWN);
+        if (this.terminated.compareAndSet(false, true)) {
             for (int i = 0; i < this.ioReactors.length; i++) {
                 Closer.close(this.ioReactors[i], CloseMode.IMMEDIATE);
             }
@@ -125,7 +128,6 @@ class MultiCoreIOReactor implements IOReactor {
                 this.threads[i].interrupt();
             }
         }
-        this.status.set(IOReactorStatus.SHUT_DOWN);
     }
 
     @Override