You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@geode.apache.org by kohlmu-pivotal <gi...@git.apache.org> on 2017/08/09 21:18:46 UTC

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

GitHub user kohlmu-pivotal opened a pull request:

    https://github.com/apache/geode/pull/702

    GEODE-3416: Reduce synchronization blockages in SocketCloser.

    Remove synchronization blocks around HashMap. Replace that implementation
    with simpler ThreadPool that is not unbounded and does not grow as the
    number of remoteAddress (clients/peers) are added
    
    Thank you for submitting a contribution to Apache Geode.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    - [ ] Does `gradlew build` run cleanly?
    
    - [ ] Have you written or updated unit tests to verify your changes?
    
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and
    submit an update to your PR as soon as possible. If you need help, please send an
    email to dev@geode.apache.org.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/geode feature/GEODE-3416

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/geode/pull/702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #702
    
----
commit 56d7707e12ecdecbc1881a9ac1575fac854e8fe9
Author: Udo Kohlmeyer <uk...@pivotal.io>
Date:   2017-08-09T21:17:59Z

    GEODE-3416: Reduce synchronization blockages in SocketCloser.
    Remove synchronization blocks around HashMap. Replace that implementation
    with simpler ThreadPool that is not unbounded and does not grow as the
    number of remoteAddress (clients/peers) are added

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by WireBaron <gi...@git.apache.org>.
Github user WireBaron commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133230687
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -144,35 +156,22 @@ private boolean isClosed() {
        * called then the asyncClose will be done synchronously.
        */
       public void close() {
    -    synchronized (asyncCloseExecutors) {
    +    synchronized (closed) {
           if (!this.closed) {
             this.closed = true;
    -        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
    -          pool.shutdown();
    -        }
    -        asyncCloseExecutors.clear();
    +      } else {
    +        return;
           }
         }
    +    for (ExecutorService executorService : asyncCloseExecutors.values()) {
    +      executorService.shutdown();
    +      asyncCloseExecutors.clear();
    --- End diff --
    
    Shouldn't this be outside the for loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/geode/pull/702


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by hiteshk25 <gi...@git.apache.org>.
Github user hiteshk25 commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133249141
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java ---
    @@ -929,10 +929,6 @@ protected void removeEndpoint(DistributedMember memberID, String reason,
                   owner.getDM().getMembershipManager().getShutdownCause());
             }
           }
    -
    -      if (remoteAddress != null) {
    -        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
    -      }
    --- End diff --
    
    This doesn't require??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by bschuchardt <gi...@git.apache.org>.
Github user bschuchardt commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r134339691
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    @kohlmu-pivotal I don't believe that I said it was different behavior.  I said it needs extensive testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode issue #702: GEODE-3416: Reduce synchronization blockages in SocketClos...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on the issue:

    https://github.com/apache/geode/pull/702
  
    @bschuchardt @galen-pivotal @hiteshk25 @pivotal-amurmann @dschneider-pivotal @WireBaron @kirklund 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133238599
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    ExecutorService.shutdown does protect itself internally with locks, etc.. so we don't have to worry about multiple threads calling shutdown on the same executor service


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by hiteshk25 <gi...@git.apache.org>.
Github user hiteshk25 commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133506959
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    Thanks Bruce. So we close socket closer after closing all the connections? So we just have a executor in sync(but socket close always happens in another thread).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133255222
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    Correct and agreed. When the cache closes, I imagine SocketCloser.close() is called. Which means the closing of any socket post that would be done in-line. 
    I don't know what the expected behavior is supposed to be, for the closing of the sockets when that cache is closing. But you are correct that it could mean that the closing of the socket could be holding up the shutting down of the cache.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by WireBaron <gi...@git.apache.org>.
Github user WireBaron commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133230527
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    This could still race with a thread running the close() method below and both could end up calling shutdown on the same service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133113689
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    given that remove is on the concurrent hashmap, one should only ever get into this once. This should not suffer reentrancy problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode issue #702: GEODE-3416: Reduce synchronization blockages in SocketClos...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on the issue:

    https://github.com/apache/geode/pull/702
  
    @bschuchardt @dschneider-pivotal is on the review of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by galen-pivotal <gi...@git.apache.org>.
Github user galen-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r132869478
  
    --- Diff: geode-book/master_middleman/bookbinder_helpers.rb ---
    @@ -1,298 +0,0 @@
    -# Licensed to the Apache Software Foundation (ASF) under one
    --- End diff --
    
    Why does Github think that this is gone? If I pull your fork and `git diff origin/develop...origin/feature/GEODE-3416`, I don't see any change to this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by pivotal-amurmann <gi...@git.apache.org>.
Github user pivotal-amurmann commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133252327
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,56 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    --- End diff --
    
    I always appreciate it as a consumer if deprecations contain information about what the new solution is for the problem that the deprecated code was solving.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133353995
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java ---
    @@ -929,10 +929,6 @@ protected void removeEndpoint(DistributedMember memberID, String reason,
                   owner.getDM().getMembershipManager().getShutdownCause());
             }
           }
    -
    -      if (remoteAddress != null) {
    -        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
    -      }
    --- End diff --
    
    it does.... a missed revert...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by WireBaron <gi...@git.apache.org>.
Github user WireBaron commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133094233
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    Is this call (excutorService.shutdown) reentrant?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133238417
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -144,35 +156,22 @@ private boolean isClosed() {
        * called then the asyncClose will be done synchronously.
        */
       public void close() {
    -    synchronized (asyncCloseExecutors) {
    +    synchronized (closed) {
           if (!this.closed) {
             this.closed = true;
    -        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
    -          pool.shutdown();
    -        }
    -        asyncCloseExecutors.clear();
    +      } else {
    +        return;
           }
         }
    +    for (ExecutorService executorService : asyncCloseExecutors.values()) {
    +      executorService.shutdown();
    +      asyncCloseExecutors.clear();
    --- End diff --
    
    Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by bschuchardt <gi...@git.apache.org>.
Github user bschuchardt commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133325744
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    When the cache closes it invokes ConnectionTable.close() which closes each connection and _afterwards_ closes its SocketCloser.  All socket closing in this case will be performed asynchronously.
    
    The same is done in CacheClientNotifier.  All dispatching sockets are closed and then the SocketCloser is closed.
    
    Client proxy and connection table receiver threads are closed by shutting down their executors.
    
    Both of these behaviors are essential if the auto-reconnect mechanism is going to work properly.  Otherwise shutdown of the old cache may hang waiting on the keepalive timeout.
    
    I think extensive testing of this change is needed.  Network partition detection and auto-reconnect testing needs to be done to ensure that the cache can be properly closed under harsh conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by hiteshk25 <gi...@git.apache.org>.
Github user hiteshk25 commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r133251379
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    We close the socket in a new thread when we are not closing the cache. Otherwise, we close all sockets inline, correct? Given that, how often multiple clients are closing the connection same time?? I feel cache.close() may take some time if we close all things inline but there may be some good reason for doing that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode pull request #702: GEODE-3416: Reduce synchronization blockages in Soc...

Posted by kohlmu-pivotal <gi...@git.apache.org>.
Github user kohlmu-pivotal commented on a diff in the pull request:

    https://github.com/apache/geode/pull/702#discussion_r134328545
  
    --- Diff: geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java ---
    @@ -96,46 +99,55 @@ public int getMaxThreads() {
         return this.asyncClosePoolMaxThreads;
       }
     
    -  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool == null) {
    -        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    -        ThreadFactory tf = new ThreadFactory() {
    -          public Thread newThread(final Runnable command) {
    -            Thread thread = new Thread(tg, command);
    -            thread.setDaemon(true);
    -            return thread;
    -          }
    -        };
    -        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
    -        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
    -            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
    -        pool.allowCoreThreadTimeOut(true);
    -        asyncCloseExecutors.put(address, pool);
    +  private ExecutorService getAsyncThreadExecutor(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.get(address);
    +    if (executorService == null) {
    +      // To be used for pre-1.8 jdk releases.
    +      // createThreadPool();
    +
    +      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
    +
    +      ExecutorService previousThreadPoolExecutor =
    +          asyncCloseExecutors.putIfAbsent(address, executorService);
    +
    +      if (previousThreadPoolExecutor != null) {
    +        executorService.shutdownNow();
    +        return previousThreadPoolExecutor;
           }
    -      return pool;
         }
    +    return executorService;
    +  }
    +
    +  /**
    +   * @deprecated this method is to be used for pre 1.8 jdk.
    +   */
    +  @Deprecated
    +  private void createThreadPool() {
    +    ExecutorService executorService;
    +    final ThreadGroup threadGroup =
    +        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
    +    ThreadFactory threadFactory = new ThreadFactory() {
    +      public Thread newThread(final Runnable command) {
    +        Thread thread = new Thread(threadGroup, command);
    +        thread.setDaemon(true);
    +        return thread;
    +      }
    +    };
    +
    +    executorService = new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
    +        asyncCloseWaitTime, asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
       }
     
       /**
        * Call this method if you know all the resources in the closer for the given address are no
        * longer needed. Currently a thread pool is kept for each address and if you know that an address
        * no longer needs its pool then you should call this method.
        */
    -  public void releaseResourcesForAddress(String address) {
    -    synchronized (asyncCloseExecutors) {
    -      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
    -      if (pool != null) {
    -        pool.shutdown();
    -        asyncCloseExecutors.remove(address);
    -      }
    -    }
    -  }
     
    -  private boolean isClosed() {
    -    synchronized (asyncCloseExecutors) {
    -      return this.closed;
    +  public void releaseResourcesForAddress(String address) {
    +    ExecutorService executorService = asyncCloseExecutors.remove(address);
    +    if (executorService != null) {
    +      executorService.shutdown();
    --- End diff --
    
    @bschuchardt in behavior this code is no different than what the original implementation was. The only difference is that the synchronization block's scope has been reduced. In addition to that, the thread pool has been replaced with newWorkStealingPool. This pool is a little more optimal than the original implementation and can reduce the initial startup requirements of a ThreadPoolExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---