You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/09/03 09:45:38 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2343: HBASE-24962 Optimize BufferNode Lock

virajjasani commented on a change in pull request #2343:
URL: https://github.com/apache/hbase/pull/2343#discussion_r482817164



##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -298,6 +299,8 @@ default boolean storeInDispatchedQueue() {
   // ============================================================================================
   private final class TimeoutExecutorThread extends Thread {
     private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
+    private final ConcurrentHashMap<DelayedWithTimeout, DelayedWithTimeout> pendingBufferNode =

Review comment:
       nit: please use ConcurrentMap

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -314,17 +317,26 @@ public void run() {
         if (task instanceof DelayedTask) {
           threadPool.execute(((DelayedTask) task).getObject());
         } else {
+          pendingBufferNode.remove(task);
           ((BufferNode) task).dispatch();
         }
       }
     }
 
+    public void putIfAbsent(BufferNode bufferNode) {

Review comment:
       nit: keep it private

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -370,39 +383,47 @@ public TRemote getKey() {
     }
 
     @Override
-    public synchronized void add(final RemoteProcedure operation) {
-      if (this.operations == null) {
-        this.operations = new HashSet<>();
-        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
-        timeoutExecutor.add(this);
-      }
+    public void add(final RemoteProcedure operation) {
       this.operations.add(operation);
       if (this.operations.size() > queueMaxSize) {
-        timeoutExecutor.remove(this);
-        dispatch();
+        synchronized (lock) {
+          if (this.operations.size() > queueMaxSize) {
+            timeoutExecutor.remove(this);
+            dispatch();
+          }
+          //all procedure have been scheduled by the current thread or another thread.
+          return;
+        }
       }
+      timeoutExecutor.putIfAbsent(this);
     }
 
     @Override
-    public synchronized void dispatch() {
-      if (operations != null) {
-        remoteDispatch(getKey(), operations);
-        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
-            .forEach(operation -> dispatchedOperations.add(operation));
-        this.operations = null;
+    public void dispatch() {
+      Set<RemoteProcedure> operationsTmp = operations;
+      operations = Sets.newConcurrentHashSet();
+      if (operationsTmp.isEmpty()) {
+        return;
       }
+      remoteDispatch(getKey(), operations);
+      operations.stream().filter(operation -> operation.storeInDispatchedQueue())
+        .forEach(operation -> dispatchedOperations.add(operation));

Review comment:
       This can be replaced with method references:
   ```
         operations.stream().filter(RemoteProcedure::storeInDispatchedQueue)
           .forEach(dispatchedOperations::add);
   ```

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -370,39 +383,47 @@ public TRemote getKey() {
     }
 
     @Override
-    public synchronized void add(final RemoteProcedure operation) {
-      if (this.operations == null) {

Review comment:
       Are you sure we are matching this condition with `timeoutExecutor.putIfAbsent(this);`?
   Based on `putIfAbsent()` method, it seems only first time when entry is added for new key in map i.e when concurrent map's thread-safe version of `putIfAbsent()` returns null, we execute this block but is there any scenario that might not adhere to this `if (this.operations == null) ` condition by any chance?

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -314,17 +317,26 @@ public void run() {
         if (task instanceof DelayedTask) {
           threadPool.execute(((DelayedTask) task).getObject());
         } else {
+          pendingBufferNode.remove(task);
           ((BufferNode) task).dispatch();
         }
       }
     }
 
+    public void putIfAbsent(BufferNode bufferNode) {
+      if (pendingBufferNode.putIfAbsent(bufferNode, bufferNode) == null) {

Review comment:
       If we want to add same object as key and value, why are we using `ConcurrentHashMap` instead of `Sets.newConcurrentHashSet()`?

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -357,8 +369,9 @@ public void awaitTermination() {
    */
   protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
       implements RemoteNode<TEnv, TRemote> {
-    private Set<RemoteProcedure> operations;
-    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
+    private Set<RemoteProcedure> operations = Sets.newConcurrentHashSet();

Review comment:
       `RemoteProcedure` is parameterized class and it is used in raw format. Although not for this PR, but we should really change this notion of using the class as parameterized in this entire class.
   
   e.g
   ```
   private Set<RemoteProcedure<TEnv, TRemote>> operations ...
   public void add(final RemoteProcedure<TEnv, TRemote> operation) ...
   protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure<TEnv, TRemote>> operations);
   ```
   Nothing urgent, can be taken up in separate patch to well distinguish the commit purpose.

##########
File path: hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
##########
@@ -357,8 +369,9 @@ public void awaitTermination() {
    */
   protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
       implements RemoteNode<TEnv, TRemote> {
-    private Set<RemoteProcedure> operations;
-    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
+    private Set<RemoteProcedure> operations = Sets.newConcurrentHashSet();
+    private final Set<RemoteProcedure> dispatchedOperations = Sets.newConcurrentHashSet();
+    private final Object lock = new Object();

Review comment:
       While this looks promising but `add(RemoteProcedure operation)` and `abortOperationsInQueue()` will now contend with each other. Do we expect both to synchronize on the same object?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org