You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/09/18 23:33:50 UTC

[1/2] incubator-geode git commit: An atomic boolean is now used in Connection to make sure the socket is only scheduled once for asyncClose.

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-332 16a6bc1a6 -> 159d78167


An atomic boolean is now used in Connection to make sure the socket is only
scheduled once for asyncClose.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/718bb3f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/718bb3f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/718bb3f0

Branch: refs/heads/feature/GEODE-332
Commit: 718bb3f0f9b3fcff719c57d5143905d5e6aecc32
Parents: 16a6bc1
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Sep 18 10:30:44 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Sep 18 10:30:44 2015 -0700

----------------------------------------------------------------------
 .../java/com/gemstone/gemfire/internal/tcp/Connection.java     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/718bb3f0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index b341acc..6db01bd 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -818,6 +818,8 @@ public class Connection implements Runnable {
     }
   }
   
+  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+  
   /**
    * asynchronously close this connection
    * 
@@ -845,7 +847,9 @@ public class Connection implements Runnable {
       r.run();
     }
     else {
-      this.owner.getSocketCloser().asyncClose(this.socket, String.valueOf(this.remoteAddr), r);
+      if (this.asyncCloseCalled.compareAndSet(false, true)) {
+        this.owner.getSocketCloser().asyncClose(this.socket, String.valueOf(this.remoteAddr), r);
+      }
     }
   }
   


[2/2] incubator-geode git commit: Added unit test for SocketCloser. Also found that the pool was never creating more than one thread because the queue was unlimited. Now the core threads are set to the same value as max threads and allowCoreThreadTimeout

Posted by ds...@apache.org.
Added unit test for SocketCloser.
Also found that the pool was never creating more than one thread because the queue was unlimited.
Now the core threads are set to the same value as max threads and allowCoreThreadTimeouts is set to true.
Also changed close to call shutdown instead of shutdownNow so that any pending closes will still be attempted.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/159d7816
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/159d7816
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/159d7816

Branch: refs/heads/feature/GEODE-332
Commit: 159d7816774bea05aefc6a7209b56dbfb0092650
Parents: 718bb3f
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Sep 18 14:29:41 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Sep 18 14:29:41 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/SocketCloser.java |  30 ++--
 .../gemfire/internal/SocketCloserJUnitTest.java | 179 +++++++++++++++++++
 .../internal/SocketCloserWithWaitJUnitTest.java |  21 +++
 3 files changed, 218 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/159d7816/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
index 04ca0d8..be98db7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
@@ -38,30 +38,32 @@ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 public class SocketCloser {
   private static final Logger logger = LogService.getLogger();
   /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
-  private static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+  static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
   /** Maximum number of threads that can be doing a socket close. Any close requests over this max will queue up waiting for a thread. */
-  private static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
   /** How many milliseconds the synchronous requester waits for the async close to happen. Default is 0. Prior releases waited 50ms. */ 
-  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
   
 
   /** map of thread pools of async close threads */
   private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
-  private final long asyncCloseWaitMilliseconds;
+  private final long asyncCloseWaitTime;
+  private final TimeUnit asyncCloseWaitUnits;
   private boolean closed;
   
   public SocketCloser() {
-    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS);
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
   }
   public SocketCloser(int asyncClosePoolMaxThreads) {
-    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, ASYNC_CLOSE_WAIT_MILLISECONDS);
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
   }
-  public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads, long asyncCloseWaitMilliseconds) {
+  public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads, long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
     this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
-    this.asyncCloseWaitMilliseconds = asyncCloseWaitMilliseconds;
+    this.asyncCloseWaitTime = asyncCloseWaitTime;
+    this.asyncCloseWaitUnits = asyncCloseWaitUnits;
   }
 
   private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
@@ -77,7 +79,8 @@ public class SocketCloser {
           } 
         }; 
         BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); 
-        pool = new ThreadPoolExecutor(1, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
+        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
+        pool.allowCoreThreadTimeOut(true);
         asyncCloseExecutors.put(address, pool);
       }
       return pool;
@@ -114,7 +117,7 @@ public class SocketCloser {
       if (!this.closed) {
         this.closed = true;
         for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
-          pool.shutdownNow();
+          pool.shutdown();
         }
         asyncCloseExecutors.clear();
       }
@@ -126,12 +129,12 @@ public class SocketCloser {
     // it seems better to let the thread that requested the close to move on quickly.
     // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
     // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitMilliseconds == 0) {
+    if (this.asyncCloseWaitTime == 0) {
       getAsyncThreadExecutor(address).execute(r);
     } else {
       Future<?> future = getAsyncThreadExecutor(address).submit(r);
       try {
-        future.get(this.asyncCloseWaitMilliseconds, TimeUnit.MILLISECONDS);
+        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
       } catch (InterruptedException | ExecutionException | TimeoutException e) {
         // We want this code to wait at most 50ms for the close to happen.
         // It is ok to ignore these exception and let the close continue
@@ -182,6 +185,9 @@ public class SocketCloser {
       doItInline = true;
     }
     if (doItInline) {
+      if (extra != null) {
+        extra.run();
+      }
       inlineClose(sock);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/159d7816/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
new file mode 100644
index 0000000..681b2a7
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
@@ -0,0 +1,179 @@
+package com.gemstone.gemfire.internal;
+
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.WaitCriterion;
+
+/**
+ * Tests the default SocketCloser.
+ */
+@Category(UnitTest.class)
+public class SocketCloserJUnitTest {
+
+  private SocketCloser socketCloser;
+  
+  @Before
+  public void setUp() throws Exception {
+    this.socketCloser = createSocketCloser();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.socketCloser.close();
+  }
+  
+  private Socket createClosableSocket() {
+    return new Socket();
+  }
+
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser();
+  }
+  
+  /**
+   * Test that close requests are async.
+   */
+  @Test
+  public void testAsync() {
+    final CountDownLatch cdl = new CountDownLatch(1);
+    final AtomicInteger waitingToClose = new AtomicInteger(0);
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          waitingToClose.incrementAndGet();
+          cdl.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    };
+    
+    final int SOCKET_COUNT = 100;
+    final Socket[] aSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      aSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(aSockets[i], "A", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, aSockets[i].isClosed());
+    }
+    final Socket[] bSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      bSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(bSockets[i], "B", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, bSockets[i].isClosed());
+    }
+    // close the socketCloser first to verify that the sockets
+    // that have already been scheduled will be still be closed.
+    this.socketCloser.releaseResourcesForAddress("A");
+    this.socketCloser.close();
+    // Each thread pool (one for A and one for B) has a max of 8 threads.
+    // So verify that this many are currently waiting on cdl.
+    {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          return waitingToClose.get() == 2*8;
+        }
+        public String description() {
+          return "expected 16 waiters but found only " + waitingToClose.get();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+    // now count down the latch that allows the sockets to close
+    cdl.countDown();
+    // now all the sockets should get closed; use a wait criteria
+    // since a thread pool is doing to closes
+    {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          for (int i=0; i < SOCKET_COUNT; i++) {
+            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
+              return false;
+            }
+          }
+          return true;
+        }
+        public String description() {
+          return "one or more sockets did not close";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+  }
+  
+  /**
+   * Verify that requesting an asyncClose on an already
+   * closed socket is a noop.
+   */
+  @Test
+  public void testClosedSocket() throws IOException {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    Socket s = createClosableSocket();
+    s.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    DistributedTestCase.pause(10);
+    assertEquals(false, runnableCalled.get());
+  }
+  
+  /**
+   * Verify that a closed SocketCloser will still close an open socket
+   */
+  @Test
+  public void testClosedSocketCloser() {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    final Socket s = createClosableSocket();
+    this.socketCloser.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        return runnableCalled.get() && s.isClosed(); 
+      }
+      public String description() {
+        return "runnable was not called or socket was not closed";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/159d7816/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
new file mode 100644
index 0000000..5f3eb5b
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
@@ -0,0 +1,21 @@
+package com.gemstone.gemfire.internal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Tests SocketCloser with a wait time. The default SocketCloser does not wait.
+ */
+@Category(UnitTest.class)
+public class SocketCloserWithWaitJUnitTest extends SocketCloserJUnitTest {
+  @Override
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser(
+        SocketCloser.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS,
+        SocketCloser.ASYNC_CLOSE_POOL_MAX_THREADS,
+        1, TimeUnit.NANOSECONDS);
+  }
+}