You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2019/06/14 15:17:51 UTC

[GitHub] [hbase] openinx commented on a change in pull request #305: HBASE-22577 BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture…

openinx commented on a change in pull request #305: HBASE-22577 BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture…
URL: https://github.com/apache/hbase/pull/305#discussion_r293850928
 
 

 ##########
 File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
 ##########
 @@ -100,62 +107,64 @@ private RetriesExhaustedWithDetailsException makeError() {
     return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
   }
 
+  private void internalFlush() throws RetriesExhaustedWithDetailsException {
+    // should get the future array before calling mutator.flush, otherwise we may hit an infinite
+    // wait, since someone may add new future to the map after we calling the flush.
+    CompletableFuture<?>[] toWait = futures.keySet().toArray(new CompletableFuture<?>[0]);
+    mutator.flush();
+    try {
+      CompletableFuture.allOf(toWait).join();
+    } catch (CompletionException e) {
+      // just ignore, we will record the actual error in the errors field
+      LOG.debug("Flush failed, you should get an exception thrown to your code", e);
+    }
+    if (!errors.isEmpty()) {
+      RetriesExhaustedWithDetailsException error = makeError();
+      listener.onException(error, this);
+    }
+  }
+
   @Override
   public void mutate(List<? extends Mutation> mutations) throws IOException {
-    List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
     List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
     for (int i = 0, n = fs.size(); i < n; i++) {
       CompletableFuture<Void> toComplete = new CompletableFuture<>();
-      final int index = i;
-      addListener(fs.get(index), (r, e) -> {
+      futures.put(toComplete, toComplete);
+      Mutation mutation = mutations.get(i);
+      long heapSize = mutation.heapSize();
+      bufferedSize.addAndGet(heapSize);
+      addListener(fs.get(i), (r, e) -> {
+        futures.remove(toComplete);
+        bufferedSize.addAndGet(-heapSize);
         if (e != null) {
-          errors.add(Pair.newPair(mutations.get(index), e));
+          errors.add(Pair.newPair(mutation, e));
           toComplete.completeExceptionally(e);
         } else {
           toComplete.complete(r);
         }
       });
-      toBuffered.add(toComplete);
     }
     synchronized (this) {
-      futures.addAll(toBuffered);
-      if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
-        tryCompleteFuture();
-      }
-      if (!errors.isEmpty()) {
+      if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
 
 Review comment:
   Why bufferSize * 2 here ? maybe need a comment ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services