You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/21 21:09:08 UTC

[51/51] [abbrv] geode git commit: GEODE-3416: Reduce synchronization blockages in SocketCloser. Remove synchronization blocks around HashMap. Replace that implementation with simpler ThreadPool that is not unbounded and does not grow as the number of rem

GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c6b20a91
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c6b20a91
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c6b20a91

Branch: refs/heads/feature/GEODE-3416
Commit: c6b20a91f701315639b12458f404075395016c87
Parents: b43f502
Author: Dave Barnes <db...@pivotal.io>
Authored: Thu Aug 10 17:11:50 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Aug 21 14:07:30 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientProxy.java    |  47 +-----
 .../apache/geode/internal/net/SocketCloser.java | 165 +++++++++++--------
 .../internal/net/SocketCloserJUnitTest.java     | 155 ++++++-----------
 3 files changed, 141 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..34f232d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
    * True if we are connected to a client.
    */
   private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
+
   /**
    * True if a marker message is still in the ha queue.
    */
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..0a9a903 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,17 @@
  */
 package org.apache.geode.internal.net;
 
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -26,12 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
 /**
  * This class allows sockets to be closed without blocking. In some cases we have seen a call of
  * socket.close block for minutes. This class maintains a thread pool for every other member we have
@@ -57,7 +57,7 @@ public class SocketCloser {
    * will queue up waiting for a thread.
    */
   static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
   /**
    * How many milliseconds the synchronous requester waits for the async close to happen. Default is
    * 0. Prior releases waited 50ms.
@@ -66,13 +66,16 @@ public class SocketCloser {
       Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
 
 
-  /** map of thread pools of async close threads */
-  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+  /**
+   * map of thread pools of async close threads
+   */
+  private final ConcurrentHashMap<String, ExecutorService> asyncCloseExecutors =
+      new ConcurrentHashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
-  private boolean closed;
+  private Boolean closed = Boolean.FALSE;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -96,26 +99,47 @@ public class SocketCloser {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-        ThreadFactory tf = new ThreadFactory() {
-          public Thread newThread(final Runnable command) {
-            Thread thread = new Thread(tg, command);
-            thread.setDaemon(true);
-            return thread;
-          }
-        };
-        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
-        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
-            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
-        pool.allowCoreThreadTimeOut(true);
-        asyncCloseExecutors.put(address, pool);
+  private ExecutorService getAsyncThreadExecutor(String address) {
+    ExecutorService executorService = asyncCloseExecutors.get(address);
+    if (executorService == null) {
+      // To be used for pre-1.8 jdk releases.
+      // executorService = createThreadPoolExecutor();
+
+      executorService = getWorkStealingPool(asyncClosePoolMaxThreads);
+
+      ExecutorService previousThreadPoolExecutor =
+          asyncCloseExecutors.putIfAbsent(address, executorService);
+
+      if (previousThreadPoolExecutor != null) {
+        executorService.shutdownNow();
+        return previousThreadPoolExecutor;
       }
-      return pool;
     }
+    return executorService;
+  }
+
+  private ExecutorService getWorkStealingPool(int maxParallelThreads) {
+    return Executors.newWorkStealingPool(maxParallelThreads);
+  }
+
+  /**
+   * @deprecated since GEODE 1.3.0. Use @link{getWorkStealingPool}
+   */
+  @Deprecated
+  private ExecutorService createThreadPoolExecutor() {
+    final ThreadGroup threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable command) {
+        Thread thread = new Thread(threadGroup, command);
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    return new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
+        asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+        threadFactory);
   }
 
   /**
@@ -123,19 +147,11 @@ public class SocketCloser {
    * longer needed. Currently a thread pool is kept for each address and if you know that an address
    * no longer needs its pool then you should call this method.
    */
-  public void releaseResourcesForAddress(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(address);
-      }
-    }
-  }
 
-  private boolean isClosed() {
-    synchronized (asyncCloseExecutors) {
-      return this.closed;
+  public void releaseResourcesForAddress(String address) {
+    ExecutorService executorService = asyncCloseExecutors.remove(address);
+    if (executorService != null) {
+      executorService.shutdown();
     }
   }
 
@@ -144,35 +160,22 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    synchronized (asyncCloseExecutors) {
+    synchronized (closed) {
       if (!this.closed) {
         this.closed = true;
-        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
-          pool.shutdown();
-        }
-        asyncCloseExecutors.clear();
+      } else {
+        return;
       }
     }
+    for (ExecutorService executorService : asyncCloseExecutors.values()) {
+      executorService.shutdown();
+    }
+    asyncCloseExecutors.clear();
   }
 
-  private void asyncExecute(String address, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old (close per thread)
-    // code did. But now that we will not create a thread for every close request
-    // it seems better to let the thread that requested the close to move on quickly.
-    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitTime == 0) {
-      getAsyncThreadExecutor(address).execute(r);
-    } else {
-      Future<?> future = getAsyncThreadExecutor(address).submit(r);
-      try {
-        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
-    }
+  private Future asyncExecute(String address, Runnable runnableToExecute) {
+    ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+    return asyncThreadExecutor.submit(runnableToExecute);
   }
 
   /**
@@ -181,29 +184,30 @@ public class SocketCloser {
    * this method may block for a certain amount of time. If it is called after the SocketCloser is
    * closed then a normal synchronous close is done.
    * 
-   * @param sock the socket to close
+   * @param socket the socket to close
    * @param address identifies who the socket is connected to
    * @param extra an optional Runnable with stuff to execute in the async thread
    */
-  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
-    if (sock == null || sock.isClosed()) {
+  public void asyncClose(final Socket socket, final String address, final Runnable extra) {
+    if (socket == null || socket.isClosed()) {
       return;
     }
     boolean doItInline = false;
     try {
-      synchronized (asyncCloseExecutors) {
-        if (isClosed()) {
+      Future submittedTask = null;
+      synchronized (closed) {
+        if (closed) {
           // this SocketCloser has been closed so do a synchronous, inline, close
           doItInline = true;
         } else {
-          asyncExecute(address, new Runnable() {
+          submittedTask = asyncExecute(address, new Runnable() {
             public void run() {
               Thread.currentThread().setName("AsyncSocketCloser for " + address);
               try {
                 if (extra != null) {
                   extra.run();
                 }
-                inlineClose(sock);
+                inlineClose(socket);
               } finally {
                 Thread.currentThread().setName("unused AsyncSocketCloser");
               }
@@ -211,6 +215,9 @@ public class SocketCloser {
           });
         }
       }
+      if (submittedTask != null) {
+        waitForFutureTaskWithTimeout(submittedTask);
+      }
     } catch (OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
@@ -220,16 +227,28 @@ public class SocketCloser {
       if (extra != null) {
         extra.run();
       }
-      inlineClose(sock);
+      inlineClose(socket);
     }
   }
 
+  private void waitForFutureTaskWithTimeout(Future submittedTask) {
+    if (this.asyncCloseWaitTime != 0) {
+      try {
+        submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most the asyncCloseWaitTime for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
 
   /**
    * Closes the specified socket
    * 
    * @param sock the socket to close
    */
+
   private static void inlineClose(final Socket sock) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This

http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..a8b1d48 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
  */
 package org.apache.geode.internal.net;
 
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
    */
   @Test
   public void testAsync() {
-    final CountDownLatch cdl = new CountDownLatch(1);
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
     final AtomicInteger waitingToClose = new AtomicInteger(0);
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          waitingToClose.incrementAndGet();
-          cdl.await();
-        } catch (InterruptedException e) {
-        }
-      }
-    };
 
     final int SOCKET_COUNT = 100;
-    final Socket[] aSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      aSockets[i] = createClosableSocket();
-    }
-    // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(aSockets[i], "A", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, aSockets[i].isClosed());
-    }
-    final Socket[] bSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      bSockets[i] = createClosableSocket();
-    }
+    final int REMOTE_CLIENT_COUNT = 200;
+
+    List<Socket> trackedSockets = new ArrayList<>();
     // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(bSockets[i], "B", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, bSockets[i].isClosed());
+    // They should all be stuck on countDownLatch.
+    for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+      Socket[] aSockets = new Socket[SOCKET_COUNT];
+      String address = i + "";
+      for (int j = 0; j < SOCKET_COUNT; j++) {
+        aSockets[j] = createClosableSocket();
+        trackedSockets.add(aSockets[j]);
+        this.socketCloser.asyncClose(aSockets[j], address, () -> {
+          try {
+            waitingToClose.incrementAndGet();
+            countDownLatch.await();
+          } catch (InterruptedException e) {
+          }
+        });
+      }
     }
+
     // close the socketCloser first to verify that the sockets
     // that have already been scheduled will be still be closed.
-    this.socketCloser.releaseResourcesForAddress("A");
     this.socketCloser.close();
-    // Each thread pool (one for A and one for B) has a max of 8 threads.
-    // So verify that this many are currently waiting on cdl.
-    {
-      final int maxThreads = this.socketCloser.getMaxThreads();
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return waitingToClose.get() == 2 * maxThreads;
-        }
-
-        public String description() {
-          return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get();
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
-    // now count down the latch that allows the sockets to close
-    cdl.countDown();
+    countDownLatch.countDown();
     // now all the sockets should get closed; use a wait criteria
     // since a thread pool is doing to closes
-    {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          for (int i = 0; i < SOCKET_COUNT; i++) {
-            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
-              return false;
-            }
-          }
-          return true;
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+      boolean areAllClosed = true;
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();) {
+        Socket socket = iterator.next();
+        if (socket.isClosed()) {
+          iterator.remove();
+          continue;
         }
-
-        public String description() {
-          return "one or more sockets did not close";
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
+        areAllClosed = false;
+      }
+      return areAllClosed;
+    });
   }
 
   /**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocket() throws Exception {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    Wait.pause(10);
-    assertEquals(false, runnableCalled.get());
+    this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
   }
 
   /**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocketCloser() {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
-    final Socket s = createClosableSocket();
+    final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return runnableCalled.get() && s.isClosed();
-      }
-
-      public String description() {
-        return "runnable was not called or socket was not closed";
-      }
-    };
-    Wait.waitForCriterion(wc, 5000, 10, true);
+    this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true));
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }
 }