You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/02/09 00:43:14 UTC

[GitHub] [helix] xyuanlu opened a new pull request #1645: Use thread pool for batched call back handling events

xyuanlu opened a new pull request #1645:
URL: https://github.com/apache/helix/pull/1645


   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   
   #1642 Too many thread causing memory issue when enable batch mode in CallbackHandler 
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   
   After enabling batch mode for prod, we notice the thread count is high.
   The reason is that when enable batch mode, each callBackHandler has a dedicated thread (CallBackProcessor) handles all callbacks. Originally then batch mode disabled, the _eventThread in zlClient handles all callbacks.
   
   ### Tests
   
   - [X] The following tests are written for this issue:
   
   NA
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   (If CI test fails due to known issue, please specify the issue and test PR locally. Then copy & paste the result of "mvn test" to here.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on pull request #1645:
URL: https://github.com/apache/helix/pull/1645#issuecomment-793163591


   The change is ready to be merged in. Approved by @jiajunwang 
   
   Final commit message:
   Use thread pool for batched call back handling events
   
   Originally when enable batch mode, each callBackHandler has a dedicated thread (CallBackProcessor) handles all callbacks.  This change adds a fixed sized thread pool per each Helix manager handling all callback events. Thus the total events count when enabling batch mode would be low. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r584021477



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       I cannot believe that I never noticed this before. Thanks for raising this up. I think the original solution will work. But some details need another thought to refine.
   
   Alternatively, we can also do as this post said, https://stackoverflow.com/questions/15485840/threadpoolexecutor-with-unbounded-queue-not-creating-new-threads
   I have tested the solution and it works great. Admittedly, it is more complex. So up to you : )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on pull request #1645:
URL: https://github.com/apache/helix/pull/1645#issuecomment-776277420


   > I think we have a pending discussion on how to utilize the threadpool, right? Have we reached any agree on the design?
   
   Thanks for the reply. Yea I posted the PR so is easier to get comments and advices. I will schedule an offline discuss as well. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583326421



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();

Review comment:
       This will downgrade the lock to a read lock so that other read request won't be blocked. 

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       The reason why I am using ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS,...) is exactly the same. Since we set `allowCoreThreadTimeOut ` to true, thread will be terminated if not used after 1 sec. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r588721248



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      _lock.writeLock().unlock();
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", hash)).build();
+      result = new ThreadPoolExecutor(CALLBACK_EVENT_THREAD_POOL_SIZE,

Review comment:
       Yea I forgot to add it back.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r580648551



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+
+  public static ThreadPoolExecutor getThreadPool(int mapHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result;
+    if (_managerToCallBackThreadPoolMap.containsKey(mapHash)) {
+      result = _managerToCallBackThreadPoolMap.get(mapHash);
+      _lock.readLock().unlock();
+    } else {
+      _lock.readLock().unlock();
+      result = getAndCreateThreadPool(mapHash);
+    }
+    return result;
+  }
+
+  public static ThreadPoolExecutor getAndCreateThreadPool(int mapHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(mapHash)) {
+      // downgrade to read lock
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(mapHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", mapHash)).build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       TFTR. Updated.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {

Review comment:
       TFTR. The threadpool is one-one mapping to HelixManager, not executor. I think having a factory will provide a layer of abstraction for thread pool creation, so each CallbackEventExecutor don't need to know if the thread pool for the helixManager is already created. CallbackEventExecutor could just call `getOrCreateThreadPool ` in its constructor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r587000663



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      _lock.writeLock().unlock();
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", hash)).build();
+      result = new ThreadPoolExecutor(CALLBACK_EVENT_THREAD_POOL_SIZE,

Review comment:
       Did you remove the allow core thread to close option? In this case, it is just a fixed size threadpool, right?
   I'm fine with both ways. But the current one can be simplified if I understand the intention correctly.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,128 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private ThreadPoolExecutor _threadPoolExecutor;
+  private boolean _isShutdown = false;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventThreadPoolFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private final CallbackHandler _handler;
+    protected final String _processorName;
+    private final NotificationContext _event;
+
+    public CallbackProcessor(CallbackHandler handler, NotificationContext event) {
+      _processorName = _manager.getClusterName() + "CallbackProcessor@" + Integer

Review comment:
       Add a "-" before CallbackProcessor will make the output looks better, I guess.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,128 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private ThreadPoolExecutor _threadPoolExecutor;
+  private boolean _isShutdown = false;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventThreadPoolFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private final CallbackHandler _handler;
+    protected final String _processorName;
+    private final NotificationContext _event;
+
+    public CallbackProcessor(CallbackHandler handler, NotificationContext event) {
+      _processorName = _manager.getClusterName() + "CallbackProcessor@" + Integer
+          .toHexString(handler.hashCode());
+      _handler = handler;
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      try {
+        _handler.invoke(_event);
+      } catch (ZkInterruptedException e) {
+        logger.warn(_processorName + " thread caught a ZK connection interrupt", e);
+      } catch (ThreadDeath death) {
+        logger.error(_processorName + " thread dead " + _processorName, death);
+      } catch (Throwable t) {
+        logger.error(_processorName + " thread failed while running " + _processorName, t);
+      }
+      submitPendingHandleCallBackEventToManagerThreadPool(_handler);
+    }
+  }
+
+  public void submitEventToExecutor(NotificationContext.Type eventType, NotificationContext event,
+      CallbackHandler handler) {
+    synchronized (_callBackEventQueue) {
+      if (_isShutdown) {
+        logger.error("Failed to process callback. CallbackEventExecutor is already shut down.");
+      }
+      if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
+        _futureCallBackProcessEvent =
+            _threadPoolExecutor.submit(new CallbackProcessor(handler, event));
+      } else {
+        _callBackEventQueue.put(eventType, event);
+      }
+    }
+  }
+
+  private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler handler) {
+    synchronized (_callBackEventQueue) {
+      if (_callBackEventQueue.size() != 0) {
+        try {
+          NotificationContext event = _callBackEventQueue.take();
+          _futureCallBackProcessEvent =
+              _threadPoolExecutor.submit(new CallbackProcessor(handler, event));
+        } catch (InterruptedException e) {
+          e.printStackTrace();

Review comment:
       Log the error?

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();

Review comment:
       Better to put the unlock call in a final block to ensure safety.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583967476



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       From java Doc:
   https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#allowsCoreThreadTimeOut()
   ```If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
   ```
   If have an unbounded queue when core = 1, and max = 5, all new tasks (except the first one) will be pushed to the queue and thread count is always 1. I think the only way to have a dynamically increasing/decreasing thread pool with unbounded queue is having a coreThread==maxThread==X and timeToLive = Y.
   However, the thread pool will have 0 idle threads after the timeout has expired. So I decided to change back to a fixSize thread pool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r588919442



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();

Review comment:
       TFTR. Good point. Updated. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r588838773



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();

Review comment:
       Not the expected unlock, that one is fine.
   What I meant is that, in general, if an exception is thrown unexpectedly, then try-catch-finally will help to ensure the lock is released instead of being held forever. An alternative way is to use some autoclosable lock design. In your current code, one example is that if the new threadpoolexecutor fails, then the writelock won't be released.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583860325



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       " when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating."
   
   Since the core thread is also 5, it will keep 5 live threads in the pool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] zhangmeng916 merged pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
zhangmeng916 merged pull request #1645:
URL: https://github.com/apache/helix/pull/1645


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583861917



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       Seems to be a very strange design, why not just set the core thread to be 0? I'm just curious. The logic seems to be working.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r588722451



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventThreadPoolFactory.java
##########
@@ -0,0 +1,98 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventThreadPoolFactory {
+  private static final int CALLBACK_EVENT_THREAD_POOL_SIZE = 10;
+  private static final int CALLBACK_EVENT_THREAD_POOL_TTL_MINUTE = 3;
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int hash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(hash)) {
+      result = _managerToCallBackThreadPoolMap.get(hash);
+      _callBackEventProcessorCountPerThreadPool.get(hash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(hash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int hash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();

Review comment:
       TFTR. 
   Please correct if I am wrong. I think putting this unlock call to final block wont down grade the lock to a read lock. This need to happen before the 
   `result = _managerToCallBackThreadPoolMap.get(hash);`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r580576099



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -136,32 +135,13 @@
   private HelixCallbackMonitor _monitor;
 
   // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei
-  private AtomicReference<CallbackProcessor> _batchCallbackProcessorRef = new AtomicReference<>();
+  private AtomicReference<CallbackEventTPExecutor> _batchCallbackExecutorrRef = new AtomicReference<>();

Review comment:
       typo? double 'r'

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPExecutor.java
##########
@@ -0,0 +1,117 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventTPExecutor {

Review comment:
       TP means thread pool here? I think they are redundant for the class name. How about just call it CallbackEventExecutor?

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier

Review comment:
       Nit, the class comment should be put below ahead of the class name.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+
+  public static ThreadPoolExecutor getThreadPool(int mapHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result;
+    if (_managerToCallBackThreadPoolMap.containsKey(mapHash)) {
+      result = _managerToCallBackThreadPoolMap.get(mapHash);
+      _lock.readLock().unlock();
+    } else {
+      _lock.readLock().unlock();
+      result = getAndCreateThreadPool(mapHash);
+    }
+    return result;
+  }
+
+  public static ThreadPoolExecutor getAndCreateThreadPool(int mapHash) {

Review comment:
       Why we have 2 public methods that are doing. almost the same thing?
   I would suggest having one for getOrCreate(), another one for create() with writelock.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -136,32 +135,13 @@
   private HelixCallbackMonitor _monitor;
 
   // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei

Review comment:
       I think this TODO is no longer needed, but please correct me if I am wrong.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -660,15 +641,12 @@ public void init() {
     logger.info("initializing CallbackHandler: {}, content: {} ", _uid, getContent());
 
     if (_batchModeEnabled) {
-      CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
-      if (callbackProcessor != null) {
-        callbackProcessor.resetEventQueue();
+      CallbackEventTPExecutor callbackTFExecutor = _batchCallbackExecutorrRef.get();
+      if (callbackTFExecutor != null) {
+        callbackTFExecutor.reset();
       } else {
-        callbackProcessor = new CallbackProcessor(this);
-        callbackProcessor.start();
-        if (!_batchCallbackProcessorRef.compareAndSet(null, callbackProcessor)) {
-          callbackProcessor.shutdown();

Review comment:
       We don't need to shutdown the thread pool anymore?

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {

Review comment:
       Just curious, what is the benefit of having this additional layer of the factory? Considering the threadpool is one-one mapping to the executor, the factory does not help with any additional sharing or reusing logic. However, it introduces considerable cleanup overhead. Note that I didn't see any logic in your current version to cleanup the threadpools.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+
+  public static ThreadPoolExecutor getThreadPool(int mapHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result;
+    if (_managerToCallBackThreadPoolMap.containsKey(mapHash)) {
+      result = _managerToCallBackThreadPoolMap.get(mapHash);
+      _lock.readLock().unlock();
+    } else {
+      _lock.readLock().unlock();
+      result = getAndCreateThreadPool(mapHash);
+    }
+    return result;
+  }
+
+  public static ThreadPoolExecutor getAndCreateThreadPool(int mapHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(mapHash)) {
+      // downgrade to read lock
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(mapHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", mapHash)).build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       When is the newly created executor shutdown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583230706



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {

Review comment:
       The input hash does not need to be a manager's hash. Just name it as a general hash number. Or, if you prefer to restrict it to be the manager's hash, then we should restrict the input to be the HelixManager object directly.
   
   In either case, please refine the other related namings accordingly.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {

Review comment:
       nit, IMHO, we should use the full spell in the class/method naming. TP is not a very commonly used term for threadpool. I would suggest just put CallbackThreadPoolFactory here.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       On my second thought, I think we should not keep so many threads. Otherwise, if the manager has only one callback for a path, we will still create so many threads. This does not make sense.
   One reasonable way is to set the core thread count same as the required callback count. This can be derived from the HelixMananger listener fields. However, there seems to be no easy way to get the number directly even we have the manager object.
   
   Let's think more about it.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+          namedThreadFactory);
+      result.allowCoreThreadTimeOut(true);
+      _managerToCallBackThreadPoolMap.put(managerHash, result);
+      _callBackEventProcessorCountPerThreadPool.put(managerHash, new AtomicInteger(1));
+    }
+    _lock.writeLock().unlock();
+    return result;
+  }
+
+  public static void unregisterEventProcessor(int managerHash) {
+    ThreadPoolExecutor threadPoolToClose = null;
+    _lock.writeLock().lock();
+    int newVal = _callBackEventProcessorCountPerThreadPool.get(managerHash).decrementAndGet();
+    if (newVal == 0) {
+      _callBackEventProcessorCountPerThreadPool.remove(managerHash);
+      threadPoolToClose = _managerToCallBackThreadPoolMap.get(managerHash);
+      _managerToCallBackThreadPoolMap.remove(managerHash);
+    }

Review comment:
       nit, little bit simpler version,
   
   if (_callBackEventProcessorCountPerThreadPool.get(managerHash).decrementAndGet()) {
     _callBackEventProcessorCountPerThreadPool.remove(managerHash);
     threadPoolToClose = _managerToCallBackThreadPoolMap.remove(managerHash);
   }

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private final ThreadPoolExecutor _threadPoolExecutor;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventTPFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private CallbackHandler _handler;
+    protected String _processorName;
+    NotificationContext _event;

Review comment:
       nit, add final?

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       This is almost the same as Executors.newFixedThreadPool(). If we don't need to use these advanced features, then I suggest using the option that provides fewer options, so fewer potential bugs.
   Also, please don't hardcode the thread count in the code. At least, it should be a default value in a constant field of the class.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private final ThreadPoolExecutor _threadPoolExecutor;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventTPFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private CallbackHandler _handler;
+    protected String _processorName;
+    NotificationContext _event;
+
+    public CallbackProcessor(CallbackHandler handler, NotificationContext event) {
+      _processorName = _manager.getClusterName() + "CallbackProcessor@" + Integer
+          .toHexString(handler.hashCode());
+      _handler = handler;
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      try {
+        _handler.invoke(_event);
+      } catch (ZkInterruptedException e) {
+        logger.warn(_processorName + " thread caught a ZK connection interrupt", e);
+      } catch (ThreadDeath death) {
+        throw death;

Review comment:
       I think we need to "submitPendingHandleCallBackEventToManagerThreadPool" no matter what is the exception in the previous execution. So it should either be put in the finally block or this throw should be an error log too.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();

Review comment:
       Do we still need the read lock when write lock is already locked?

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private final ThreadPoolExecutor _threadPoolExecutor;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventTPFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private CallbackHandler _handler;
+    protected String _processorName;
+    NotificationContext _event;
+
+    public CallbackProcessor(CallbackHandler handler, NotificationContext event) {
+      _processorName = _manager.getClusterName() + "CallbackProcessor@" + Integer
+          .toHexString(handler.hashCode());
+      _handler = handler;
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      try {
+        _handler.invoke(_event);
+      } catch (ZkInterruptedException e) {
+        logger.warn(_processorName + " thread caught a ZK connection interrupt", e);
+      } catch (ThreadDeath death) {
+        throw death;
+      } catch (Throwable t) {
+        logger.error(_processorName + " thread failed while running " + _processorName, t);
+      }
+      submitPendingHandleCallBackEventToManagerThreadPool(_handler);
+    }
+  }
+
+  public void submitEventToEvecutor(NotificationContext.Type eventType, NotificationContext event,
+      CallbackHandler handler) {
+    synchronized (_callBackEventQueue) {
+      if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
+        _futureCallBackProcessEvent =
+            _threadPoolExecutor.submit(new CallbackProcessor(handler, event));
+      } else {
+        _callBackEventQueue.put(eventType, event);
+      }
+    }
+  }
+
+  private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler handler) {
+    synchronized (_callBackEventQueue) {
+      if (_callBackEventQueue.size() != 0) {
+        try {
+          NotificationContext event = _callBackEventQueue.take();
+          _futureCallBackProcessEvent =
+              _threadPoolExecutor.submit(new CallbackProcessor(handler, event));
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  public void reset() {
+    synchronized (_callBackEventQueue) {
+      _callBackEventQueue.clear();
+      if (_futureCallBackProcessEvent != null) {
+        _futureCallBackProcessEvent.cancel(false);
+      }
+    }
+  }
+
+  public void unregisterFromFactory() {
+    reset();
+    CallbackEventTPFactory.unregisterEventProcessor(_manager.hashCode());

Review comment:
       You will need to set _threadPoolExecutor = null too.
   
   Moreover, to ensure it is safe after unregister, the executor shall reject any new task submission after this call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583859723



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();

Review comment:
       I thought that you need to unlock the write lock to achieve what you said. It is not doing it automatically. Please correct me if I am wrong.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r588919595



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       From our offline discussion. I will keep the same but update the TTL and thread count. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r580688682



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,76 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// returns a thread pool object given a HelixManager identifier
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class CallbackEventTPFactory {

Review comment:
       I see. Makes sense. Thanks for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583967476



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _lock.readLock().unlock();
+    } else {
+      ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+          .setNameFormat(String.format("CallbackHandlerExecutorService - %s ", managerHash))
+          .build();
+      result = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

Review comment:
       From java Doc:
   https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#allowsCoreThreadTimeOut()
   ```
   If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
   ```
   If have an unbounded queue when core = 1, and max = 5, all new tasks (except the first one) will be pushed to the queue and thread count is always 1. I think the only way to have a dynamically increasing/decreasing thread pool with unbounded queue is having a coreThread==maxThread==X and timeToLive = Y.
   However, the thread pool will have 0 idle threads after the timeout has expired. So I decided to change back to a fixSize thread pool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583878348



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPFactory.java
##########
@@ -0,0 +1,96 @@
+package org.apache.helix.manager.zk;
+
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// returns a thread pool object given a HelixManager identifier
+public class CallbackEventTPFactory {
+  static private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  static private Map<Integer, ThreadPoolExecutor> _managerToCallBackThreadPoolMap = new HashMap();
+  static private Map<Integer, AtomicInteger> _callBackEventProcessorCountPerThreadPool =
+      new HashMap();
+
+  public static ThreadPoolExecutor getOrCreateThreadPool(int managerHash) {
+    // should not use general lock for read
+    _lock.readLock().lock();
+    ThreadPoolExecutor result = null;
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      result = _managerToCallBackThreadPoolMap.get(managerHash);
+      _callBackEventProcessorCountPerThreadPool.get(managerHash).incrementAndGet();
+    }
+    _lock.readLock().unlock();
+    if (result == null) {
+      result = getOrCreateThreadPoolHelper(managerHash);
+    }
+    return result;
+  }
+
+  private static ThreadPoolExecutor getOrCreateThreadPoolHelper(int managerHash) {
+    ThreadPoolExecutor result;
+    _lock.writeLock().lock();
+    // first check if the key is already in the map
+    if (_managerToCallBackThreadPoolMap.containsKey(managerHash)) {
+      // downgrade to read lock since we dont need to modify the map
+      _lock.readLock().lock();

Review comment:
       Yea you are correct. Updated. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r580649063



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventTPExecutor.java
##########
@@ -0,0 +1,117 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventTPExecutor {

Review comment:
       TFTR. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
lei-xia commented on pull request #1645:
URL: https://github.com/apache/helix/pull/1645#issuecomment-776271851


   I think we have a pending discussion on how to utilize the threadpool, right?  Have we reached any agree on the design? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu commented on a change in pull request #1645: Use thread pool for batched call back handling events

Posted by GitBox <gi...@apache.org>.
xyuanlu commented on a change in pull request #1645:
URL: https://github.com/apache/helix/pull/1645#discussion_r583334615



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.DedupEventBlockingQueue;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Each batch mode is enabled, each CallbackHandler has a CallbackEventTPExecutor.
+ * It has a dedupe queue for pending call back event. Pending call back event will
+ * be submitted to a thread pool one at a time when
+ * 1. This is the first ever call back event for the callBackHandler, or
+ * 2. The previous call back event handling process is finished in thread pool.
+ */
+
+public class CallbackEventExecutor {
+  private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
+
+  private DedupEventBlockingQueue<NotificationContext.Type, NotificationContext>
+      _callBackEventQueue;
+  private final HelixManager _manager;
+  private Future _futureCallBackProcessEvent = null;
+  private final ThreadPoolExecutor _threadPoolExecutor;
+
+  public CallbackEventExecutor(HelixManager manager) {
+    _callBackEventQueue = new DedupEventBlockingQueue<>();
+    _manager = manager;
+    _threadPoolExecutor = CallbackEventTPFactory.getOrCreateThreadPool(manager.hashCode());
+  }
+
+  class CallbackProcessor implements Runnable {
+    private CallbackHandler _handler;
+    protected String _processorName;
+    NotificationContext _event;

Review comment:
       TFTR. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org