You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/10/13 07:27:26 UTC

bookkeeper git commit: BOOKKEEPER-924: addEntry() is susceptible to spurious wakeups

Repository: bookkeeper
Updated Branches:
  refs/heads/master bbd1eb8d8 -> bf4a4d6a0


BOOKKEEPER-924: addEntry() is susceptible to spurious wakeups

Use Java8 CompletableFuture instead of SyncCounter

Author: eolivelli <eo...@gmail.com>

Reviewers: sijie@apache.org <si...@apache.org>

Closes #60 from eolivelli/BOOKKEEPER-924 and squashes the following commits:

61e6b1a [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups
7d7eaf7 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups
f865610 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups
e75569a [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups
cdd32c3 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/bf4a4d6a
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/bf4a4d6a
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/bf4a4d6a

Branch: refs/heads/master
Commit: bf4a4d6a07f9d615752054c6743035cebd86716e
Parents: bbd1eb8
Author: eolivelli <eo...@gmail.com>
Authored: Thu Oct 13 00:27:18 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Oct 13 00:27:18 2016 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BookKeeper.java    | 91 ++++++--------------
 .../bookkeeper/client/BookKeeperAdmin.java      | 64 ++++++--------
 .../apache/bookkeeper/client/LedgerHandle.java  | 56 +++---------
 .../bookkeeper/client/LedgerHandleAdv.java      | 14 +--
 .../bookkeeper/client/SynchCallbackUtils.java   | 67 ++++++++++++++
 5 files changed, 139 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index b683ca4..08c24b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -65,6 +65,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * BookKeeper client. We assume there is one single writer to a ledger at any
@@ -616,27 +618,20 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
                                      DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
             throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
         /*
          * Calls asynchronous version
          */
         asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
                           new SyncCreateCallback(), counter, customMetadata);
 
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            LOG.error("Error while creating ledger : {}", counter.getrc());
-            throw BKException.create(counter.getrc());
-        } else if (counter.getLh() == null) {
+        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        if (lh == null) {
             LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
             throw BKException.create(BKException.Code.UnexpectedConditionException);
         }
-
-        return counter.getLh();
+        return lh;
     }
 
     /**
@@ -682,27 +677,20 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
                                         DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
             throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
         /*
          * Calls asynchronous version
          */
         asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
                              new SyncCreateCallback(), counter, customMetadata);
 
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            LOG.error("Error while creating ledger : {}", counter.getrc());
-            throw BKException.create(counter.getrc());
-        } else if (counter.getLh() == null) {
+        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        if (lh == null) {
             LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
             throw BKException.create(BKException.Code.UnexpectedConditionException);
         }
-
-        return counter.getLh();
+        return lh;
     }
 
     /**
@@ -855,22 +843,14 @@ public class BookKeeper implements AutoCloseable {
 
     public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
             throws BKException, InterruptedException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
 
         /*
          * Calls async open ledger
          */
         asyncOpenLedger(lId, digestType, passwd, new SyncOpenCallback(), counter);
 
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK)
-            throw BKException.create(counter.getrc());
-
-        return counter.getLh();
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -890,8 +870,7 @@ public class BookKeeper implements AutoCloseable {
 
     public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
             throws BKException, InterruptedException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
 
         /*
          * Calls async open ledger
@@ -899,14 +878,7 @@ public class BookKeeper implements AutoCloseable {
         asyncOpenLedgerNoRecovery(lId, digestType, passwd,
                                   new SyncOpenCallback(), counter);
 
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK)
-            throw BKException.create(counter.getrc());
-
-        return counter.getLh();
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -944,16 +916,11 @@ public class BookKeeper implements AutoCloseable {
      * @throws BKException
      */
     public void deleteLedger(long lId) throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<Void> counter = new CompletableFuture<>();
         // Call asynchronous version
         asyncDeleteLedger(lId, new SyncDeleteCallback(), counter);
-        // Wait
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            LOG.error("Error deleting ledger " + lId + " : " + counter.getrc());
-            throw BKException.create(counter.getrc());
-        }
+
+        SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -1079,11 +1046,9 @@ public class BookKeeper implements AutoCloseable {
          *          optional control object
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-            SyncCounter counter = (SyncCounter) ctx;
-            counter.setLh(lh);
-            counter.setrc(rc);
-            counter.dec();
+            SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx);
         }
     }
 
@@ -1099,14 +1064,9 @@ public class BookKeeper implements AutoCloseable {
          *          optional control object
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-            SyncCounter counter = (SyncCounter) ctx;
-            counter.setLh(lh);
-
-            LOG.debug("Open complete: {}", rc);
-
-            counter.setrc(rc);
-            counter.dec();
+            SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx);
         }
     }
 
@@ -1120,10 +1080,9 @@ public class BookKeeper implements AutoCloseable {
          *            optional control object
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void deleteComplete(int rc, Object ctx) {
-            SyncCounter counter = (SyncCounter) ctx;
-            counter.setrc(rc);
-            counter.dec();
+            SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>) ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 022d4da..dd8fde4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -35,6 +36,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
@@ -260,18 +263,11 @@ public class BookKeeperAdmin {
      */
     public LedgerHandle openLedger(final long lId) throws InterruptedException,
             BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
         new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter).initiate();
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
-        }
 
-        return counter.getLh();
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -303,19 +299,12 @@ public class BookKeeperAdmin {
      */
     public LedgerHandle openLedgerNoRecovery(final long lId)
             throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
         new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter)
                 .initiateWithoutRecovery();
-        /*
-         * Wait
-         */
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
-        }
 
-        return counter.getLh();
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -384,16 +373,13 @@ public class BookKeeperAdmin {
             }
             if (lastEntryId == -1 || nextEntryId <= lastEntryId) {
                 try {
-                    SyncCounter counter = new SyncCounter();
-                    counter.inc();
+                    CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
 
                     handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new LedgerHandle.SyncReadCallback(),
                             counter);
-                    counter.block(0);
-                    if (counter.getrc() != BKException.Code.OK) {
-                        throw BKException.create(counter.getrc());
-                    }
-                    currentEntry = counter.getSequence().nextElement();
+
+                    currentEntry = SynchCallbackUtils.waitForResult(counter).nextElement();
+
                     return true;
                 } catch (Exception e) {
                     if (e instanceof BKException.BKNoSuchEntryException && lastEntryId == -1) {
@@ -862,31 +848,33 @@ public class BookKeeperAdmin {
             final LedgerFragment ledgerFragment,
             final BookieSocketAddress targetBookieAddress)
             throws InterruptedException, BKException {
-        SyncCounter syncCounter = new SyncCounter();
-        ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
+        CompletableFuture<Void> counter = new CompletableFuture<>();
+        ResultCallBack resultCallBack = new ResultCallBack(counter);
         SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
                 lh, ledgerFragment.getFirstEntryId(), ledgerFragment
                         .getAddress(), targetBookieAddress);
-        syncCounter.inc();
+
         asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
-        syncCounter.block(0);
-        if (syncCounter.getrc() != BKException.Code.OK) {
-            throw BKException.create(bkc.getReturnRc(syncCounter.getrc()));
+
+        try {
+            SynchCallbackUtils.waitForResult(counter);
+        } catch (BKException err) {
+            throw BKException.create(bkc.getReturnRc(err.getCode()));
         }
     }
 
     /** This is the class for getting the replication result */
     static class ResultCallBack implements AsyncCallback.VoidCallback {
-        private SyncCounter sync;
+        private final CompletableFuture<Void> sync;
 
-        public ResultCallBack(SyncCounter sync) {
+        public ResultCallBack(CompletableFuture<Void> sync) {
             this.sync = sync;
         }
 
         @Override
-        public void processResult(int rc, String s, Object obj) {
-            sync.setrc(rc);
-            sync.dec();
+        @SuppressWarnings("unchecked")
+        public void processResult(int rc, String s, Object ctx) {
+            SynchCallbackUtils.finish(rc, null, sync);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 06f84eb..5c33929 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Ledger handle contains ledger metadata and is used to access the read and
@@ -266,15 +268,11 @@ public class LedgerHandle implements AutoCloseable {
      */
     public void close()
             throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<Void> counter = new CompletableFuture<>();
 
         asyncClose(new SyncCloseCallback(), counter);
 
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
-        }
+        SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -461,17 +459,11 @@ public class LedgerHandle implements AutoCloseable {
      */
     public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
             throws InterruptedException, BKException {
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>();
 
         asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(), counter);
 
-        counter.block(0);
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
-        }
-
-        return counter.getSequence();
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -550,18 +542,12 @@ public class LedgerHandle implements AutoCloseable {
             throws InterruptedException, BKException {
         LOG.debug("Adding entry {}", data);
 
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<Long> counter = new CompletableFuture<>();
 
         SyncAddCallback callback = new SyncAddCallback();
         asyncAddEntry(data, offset, length, callback, counter);
-        counter.block(0);
-
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
-        }
 
-        return callback.entryId;
+        return SynchCallbackUtils.waitForResult(counter);
     }
 
     /**
@@ -1353,21 +1339,14 @@ public class LedgerHandle implements AutoCloseable {
          *          control object
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void readComplete(int rc, LedgerHandle lh,
                                  Enumeration<LedgerEntry> seq, Object ctx) {
-
-            SyncCounter counter = (SyncCounter) ctx;
-            synchronized (counter) {
-                counter.setSequence(seq);
-                counter.setrc(rc);
-                counter.dec();
-                counter.notify();
-            }
+            SynchCallbackUtils.finish(rc, seq, (CompletableFuture<Enumeration<LedgerEntry>>)ctx);
         }
     }
 
     static class SyncAddCallback implements AddCallback {
-        long entryId = -1;
 
         /**
          * Implementation of callback interface for synchronous read method.
@@ -1382,12 +1361,9 @@ public class LedgerHandle implements AutoCloseable {
          *          control object
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
-            SyncCounter counter = (SyncCounter) ctx;
-
-            this.entryId = entry;
-            counter.setrc(rc);
-            counter.dec();
+            SynchCallbackUtils.finish(rc, entry, (CompletableFuture<Long>)ctx);
         }
     }
 
@@ -1416,13 +1392,9 @@ public class LedgerHandle implements AutoCloseable {
          * @param ctx
          */
         @Override
+        @SuppressWarnings("unchecked")
         public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-            SyncCounter counter = (SyncCounter) ctx;
-            counter.setrc(rc);
-            synchronized (counter) {
-                counter.dec();
-                counter.notify();
-            }
+            SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>)ctx);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index c69a0e5..4a7de57 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.client;
 import java.io.Serializable;
 import java.security.GeneralSecurityException;
 import java.util.Comparator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -90,18 +92,16 @@ public class LedgerHandleAdv extends LedgerHandle {
             BKException {
         LOG.debug("Adding entry {}", data);
 
-        SyncCounter counter = new SyncCounter();
-        counter.inc();
+        CompletableFuture<Long> counter = new CompletableFuture<>();
 
         SyncAddCallback callback = new SyncAddCallback();
         asyncAddEntry(entryId, data, offset, length, callback, counter);
 
-        counter.block(0);
-
-        if (counter.getrc() != BKException.Code.OK) {
-            throw BKException.create(counter.getrc());
+        try {
+            return counter.get();
+        } catch (ExecutionException err) {
+            throw (BKException) err.getCause();
         }
-        return callback.entryId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java
new file mode 100644
index 0000000..d1ef9e4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Utility for callbacks
+ * 
+ */
+public class SynchCallbackUtils {
+
+    /**
+     * Wait for a result. This is convenience method to implement callbacks
+     *
+     * @param <T>
+     * @param future
+     * @return
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public static <T> T waitForResult(CompletableFuture<T> future) throws InterruptedException, BKException {
+        try {
+            return future.get();
+        } catch (ExecutionException err) {
+            if (err.getCause() instanceof BKException) {
+                throw (BKException) err.getCause();
+            } else {
+                BKException unexpectedConditionException
+                    = BKException.create(BKException.Code.UnexpectedConditionException);
+                unexpectedConditionException.initCause(err.getCause());
+                throw unexpectedConditionException;
+            }
+
+        }
+    }
+
+    /**
+     * Handle the Response Code and transform it to a BKException
+     * @param <T>
+     * @param rc
+     * @param result
+     * @param future 
+     */
+    public static <T> void finish(int rc, T result, CompletableFuture<T> future) {
+        if (rc != BKException.Code.OK) {
+            future.completeExceptionally(BKException.create(rc).fillInStackTrace());
+        } else {
+            future.complete(result);
+        }
+    }
+
+}