You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/27 06:02:30 UTC

[GitHub] eolivelli closed pull request #1288: Add sync variants of all methods in handles

eolivelli closed pull request #1288: Add sync variants of all methods in handles
URL: https://github.com/apache/bookkeeper/pull/1288
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index eebe43ce9..8db566739 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -389,14 +389,14 @@ void writeLedgerConfig(GenericCallback<Void> writeCb) {
     @Override
     public void close()
             throws InterruptedException, BKException {
-        SyncCallbackUtils.waitForResult(asyncClose());
+        SyncCallbackUtils.waitForResult(closeAsync());
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<Void> asyncClose() {
+    public CompletableFuture<Void> closeAsync() {
         CompletableFuture<Void> result = new CompletableFuture<>();
         SyncCloseCallback callback = new SyncCloseCallback(result);
         asyncClose(callback, null);
@@ -707,7 +707,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
      *          id of last entry of sequence
      */
     @Override
-    public CompletableFuture<LedgerEntries> read(long firstEntry, long lastEntry) {
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
         // Little sanity check
         if (firstEntry < 0 || firstEntry > lastEntry) {
             LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
@@ -748,7 +748,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
      * @see #readUnconfirmedEntries(long, long)
      */
     @Override
-    public CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry, long lastEntry) {
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
         // Little sanity check
         if (firstEntry < 0 || firstEntry > lastEntry) {
             LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
@@ -852,7 +852,7 @@ public long addEntry(byte[] data) throws InterruptedException, BKException {
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<Long> append(ByteBuf data) {
+    public CompletableFuture<Long> appendAsync(ByteBuf data) {
         SyncAddCallback callback = new SyncAddCallback();
         asyncAddEntry(data, callback, null);
         return callback;
@@ -1206,7 +1206,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<Long> tryReadLastAddConfirmed() {
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
         FutureReadLastConfirmed result = new FutureReadLastConfirmed();
         asyncTryReadLastConfirmed(result, null);
         return result;
@@ -1216,7 +1216,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<Long> readLastAddConfirmed() {
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
         FutureReadLastConfirmed result = new FutureReadLastConfirmed();
         asyncReadLastConfirmed(result, null);
         return result;
@@ -1226,9 +1226,9 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
-                                                                                 long timeOutInMillis,
-                                                                                 boolean parallel) {
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
         FutureReadLastConfirmedAndEntry result = new FutureReadLastConfirmedAndEntry();
         asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, result, null);
         return result;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 80581032b..afd56cf2e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -246,7 +246,7 @@ public String toString() {
     }
 
     @Override
-    public CompletableFuture<Long> write(long entryId, ByteBuf data) {
+    public CompletableFuture<Long> writeAsync(long entryId, ByteBuf data) {
         SyncAddCallback callback = new SyncAddCallback();
         asyncAddEntry(entryId, data, callback, data);
         return callback;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index 3a51716f2..b81f33de7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -16,6 +16,7 @@
 package org.apache.bookkeeper.client.api;
 
 import java.lang.reflect.Field;
+import java.util.function.Function;
 
 import org.apache.bookkeeper.client.LedgerHandleAdv;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
@@ -28,7 +29,20 @@
  */
 @Public
 @Unstable
-public abstract class BKException extends Exception {
+public class BKException extends Exception {
+    static final Function<Throwable, BKException> HANDLER = cause -> {
+        if (cause == null) {
+            return null;
+        }
+        if (cause instanceof BKException) {
+            return (BKException) cause;
+        } else {
+            BKException ex = new BKException(Code.UnexpectedConditionException);
+            ex.initCause(cause);
+            return ex;
+        }
+    };
+
     protected final int code;
 
     private static final LogMessagePool logMessagePool = new LogMessagePool();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
index f520e61ed..1f2f8ca51 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/Handle.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.client.api;
 
 import java.util.concurrent.CompletableFuture;
-import lombok.SneakyThrows;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -43,18 +42,24 @@
     long getId();
 
     /**
-     * Close this ledger synchronously.
+     * Close this handle synchronously.
      *
      * @throws org.apache.bookkeeper.client.api.BKException
      * @throws java.lang.InterruptedException
-     * @see #asyncClose
+     * @see #closeAsync
      */
     @Override
-    @SneakyThrows(Exception.class)
     default void close() throws BKException, InterruptedException {
-        FutureUtils.result(asyncClose());
+        FutureUtils.<Void, BKException>result(closeAsync(), BKException.HANDLER);
     }
 
+    /**
+     * Asynchronous close the handle.
+     *
+     * @return an handle to access the result of the operation
+     */
+    CompletableFuture<Void> closeAsync();
+
     /**
      * Returns the metadata of this ledger.
      *
@@ -65,18 +70,4 @@ default void close() throws BKException, InterruptedException {
      * @return the metadata of this ledger.
      */
     LedgerMetadata getLedgerMetadata();
-
-    /**
-     * Asynchronous close, any adds in flight will return errors.
-     *
-     * <p>Closing a ledger will ensure that all clients agree on what the last
-     * entry of the ledger is. This ensures that, once the ledger has been closed,
-     * all reads from the ledger will return the same set of entries.
-     *
-     * @return an handle to access the result of the operation
-     *
-     * @see FutureUtils#result(java.util.concurrent.CompletableFuture) to have a simple method to access the result
-     */
-    CompletableFuture<Void> asyncClose();
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index 48baf1bb2..04533dc24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 
 /**
  * Provide read access to a ledger.
@@ -42,7 +43,21 @@
      *          id of last entry of sequence, inclusive
      * @return an handle to the result of the operation
      */
-    CompletableFuture<LedgerEntries> read(long firstEntry, long lastEntry);
+    CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
+
+    /**
+     * Read a sequence of entries synchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence, inclusive
+     * @return the result of the operation
+     */
+    default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, InterruptedException {
+        return FutureUtils.<LedgerEntries, BKException>result(readAsync(firstEntry, lastEntry),
+                                                              BKException.HANDLER);
+    }
 
     /**
      * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range.
@@ -64,10 +79,27 @@
      *          id of last entry of sequence, inclusive
      * @return an handle to the result of the operation
      *
-     * @see #read(long, long)
-     * @see #readLastAddConfirmed()
+     * @see #readAsync(long, long)
+     * @see #readLastAddConfirmedAsync()
+     */
+    CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
+
+    /**
+     * Read a sequence of entries synchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence, inclusive
+     * @return an handle to the result of the operation
+     *
+     * @see #readUnconfirmedAsync(long, long)
      */
-    CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry, long lastEntry);
+    default LedgerEntries readUnconfirmed(long firstEntry, long lastEntry)
+            throws BKException, InterruptedException {
+        return FutureUtils.<LedgerEntries, BKException>result(readUnconfirmedAsync(firstEntry, lastEntry),
+                                                              BKException.HANDLER);
+    }
 
     /**
      * Obtains asynchronously the last confirmed write from a quorum of bookies. This
@@ -80,9 +112,20 @@
      *
      * @return an handle to the result of the operation
      * @see #getLastAddConfirmed()
+     */
+    CompletableFuture<Long> readLastAddConfirmedAsync();
+
+    /**
+     * Obtains asynchronously the last confirmed write from a quorum of bookies.
      *
+     * @return the result of the operation
+     * @see #readLastAddConfirmedAsync()
      */
-    CompletableFuture<Long> readLastAddConfirmed();
+    default long readLastAddConfirmed() throws BKException, InterruptedException {
+        return FutureUtils.<Long, BKException>result(readLastAddConfirmedAsync(),
+                                                     BKException.HANDLER);
+    }
+
 
     /**
      * Obtains asynchronously the last confirmed write from a quorum of bookies
@@ -90,10 +133,20 @@
      * immediately if it received a LAC which is larger than current LAC.
      *
      * @return an handle to the result of the operation
-     * @see #tryReadLastAddConfirmed()
+     */
+    CompletableFuture<Long> tryReadLastAddConfirmedAsync();
+
+    /**
+     * Obtains asynchronously the last confirmed write from a quorum of bookies
+     * but it doesn't wait all the responses from the quorum.
      *
+     * @return the result of the operation
+     * @see #tryReadLastAddConfirmedAsync()
      */
-    CompletableFuture<Long> tryReadLastAddConfirmed();
+    default long tryReadLastAddConfirmed() throws BKException, InterruptedException {
+        return FutureUtils.<Long, BKException>result(tryReadLastAddConfirmedAsync(),
+                                                     BKException.HANDLER);
+    }
 
     /**
      * Get the last confirmed entry id on this ledger. It reads the local state of the ledger handle,
@@ -144,8 +197,30 @@
      *          whether to issue the long poll reads in parallel
      * @return an handle to the result of the operation
      */
-    CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
-                                                                          long timeOutInMillis,
-                                                                          boolean parallel);
+    CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               long timeOutInMillis,
+                                                                               boolean parallel);
+
+    /**
+     * Asynchronous read specific entry and the latest last add confirmed.
+     *
+     * @param entryId
+     *          next entry id to read
+     * @param timeOutInMillis
+     *          timeout period to wait for the entry id to be available (for long poll only)
+     *          if timeout for get the entry, it will return null entry.
+     * @param parallel
+     *          whether to issue the long poll reads in parallel
+     * @return the result of the operation
+     * @see #readLastAddConfirmedAndEntry(long, long, boolean)
+     */
+    default LastConfirmedAndEntry readLastAddConfirmedAndEntry(long entryId,
+                                                               long timeOutInMillis,
+                                                               boolean parallel)
+            throws BKException, InterruptedException {
+        return FutureUtils.<LastConfirmedAndEntry, BKException>result(
+                readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel),
+                BKException.HANDLER);
+    }
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
index 037f7dd72..37f45b9b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 
 /**
  * Provide write access to a ledger. Using WriteAdvHandler the writer MUST explictly set an entryId. Beware that the
@@ -47,7 +48,19 @@
      * @param data array of bytes to be written
      * @return an handle to the result, in case of success it will return the same value of param entryId.
      */
-    default CompletableFuture<Long> write(final long entryId, final ByteBuffer data) {
+    default CompletableFuture<Long> writeAsync(final long entryId, final ByteBuffer data) {
+        return writeAsync(entryId, Unpooled.wrappedBuffer(data));
+    }
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param entryId entryId to be added
+     * @param data array of bytes to be written
+     * @return the same value of param entryId.
+     */
+    default long write(final long entryId, final ByteBuffer data)
+            throws BKException, InterruptedException {
         return write(entryId, Unpooled.wrappedBuffer(data));
     }
 
@@ -58,7 +71,19 @@
      * @param data array of bytes to be written
      * @return an handle to the result, in case of success it will return the same value of param {@code entryId}.
      */
-    default CompletableFuture<Long> write(final long entryId, final byte[] data) {
+    default CompletableFuture<Long> writeAsync(final long entryId, final byte[] data) {
+        return writeAsync(entryId, Unpooled.wrappedBuffer(data));
+    }
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param entryId entryId to be added.
+     * @param data array of bytes to be written
+     * @return same value of param {@code entryId}.
+     */
+    default long write(final long entryId, final byte[] data)
+            throws BKException, InterruptedException {
         return write(entryId, Unpooled.wrappedBuffer(data));
     }
 
@@ -71,7 +96,21 @@
      * @param length the length to data to write
      * @return an handle to the result, in case of success it will return the same value of param {@code entryId}.
      */
-    default CompletableFuture<Long> write(final long entryId, final byte[] data, int offset, int length) {
+    default CompletableFuture<Long> writeAsync(final long entryId, final byte[] data, int offset, int length) {
+        return writeAsync(entryId, Unpooled.wrappedBuffer(data, offset, length));
+    }
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param entryId entryId to  be added.
+     * @param data array of bytes to be written
+     * @param offset the offset of the bytes array
+     * @param length the length to data to write
+     * @return the same value of param {@code entryId}.
+     */
+    default long write(final long entryId, final byte[] data, int offset, int length)
+            throws BKException, InterruptedException {
         return write(entryId, Unpooled.wrappedBuffer(data, offset, length));
     }
 
@@ -82,6 +121,16 @@
      * @param data array of bytes to be written
      * @return an handle to the result, in case of success it will return the same value of param entryId
      */
-    CompletableFuture<Long> write(long entryId, ByteBuf data);
+    CompletableFuture<Long> writeAsync(long entryId, ByteBuf data);
 
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param entryId entryId to be added
+     * @param data array of bytes to be written
+     * @return the same value of param entryId
+     */
+    default long write(long entryId, ByteBuf data) throws BKException, InterruptedException {
+        return FutureUtils.<Long, BKException>result(writeAsync(entryId, data), BKException.HANDLER);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
index cefd74916..b2c04597f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java
@@ -24,8 +24,10 @@
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 
 /**
  * Provide write access to a ledger.
@@ -45,7 +47,18 @@
      *             completable future is returned
      * @return an handle to the result, in case of success it will return the id of the newly appended entry
      */
-    CompletableFuture<Long> append(ByteBuf data);
+    CompletableFuture<Long> appendAsync(ByteBuf data);
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param data a bytebuf to be written. The bytebuf's reference count will be decremented by 1 after the
+     *             call completes.
+     * @return the id of the newly appended entry
+     */
+    default long append(ByteBuf data) throws BKException, InterruptedException {
+        return FutureUtils.<Long, BKException>result(appendAsync(data), BKException.HANDLER);
+    }
 
     /**
      * Add entry asynchronously to an open ledger.
@@ -53,7 +66,17 @@
      * @param data array of bytes to be written
      * @return an handle to the result, in case of success it will return the id of the newly appended entry
      */
-    default CompletableFuture<Long> append(ByteBuffer data) {
+    default CompletableFuture<Long> appendAsync(ByteBuffer data) {
+        return appendAsync(Unpooled.wrappedBuffer(data));
+    }
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param data array of bytes to be written
+     * @return the id of the newly appended entry
+     */
+    default long append(ByteBuffer data) throws BKException, InterruptedException {
         return append(Unpooled.wrappedBuffer(data));
     }
 
@@ -64,7 +87,17 @@
      * @return a completable future represents the add result, in case of success the future returns the entry id
      *         of this newly appended entry
      */
-    default CompletableFuture<Long> append(byte[] data) {
+    default CompletableFuture<Long> appendAsync(byte[] data) {
+        return appendAsync(Unpooled.wrappedBuffer(data));
+    }
+
+    /**
+     * Add an entry synchronously to an open ledger.
+     *
+     * @param data array of bytes to be written
+     * @return the entry id of this newly appended entry
+     */
+    default long append(byte[] data) throws BKException, InterruptedException {
         return append(Unpooled.wrappedBuffer(data));
     }
 
@@ -77,7 +110,19 @@
      * @return a completable future represents the add result, in case of success the future returns the entry id
      *         of this newly appended entry
      */
-    default CompletableFuture<Long> append(byte[] data, int offset, int length) {
+    default CompletableFuture<Long> appendAsync(byte[] data, int offset, int length) {
+        return appendAsync(Unpooled.wrappedBuffer(data, offset, length));
+    }
+
+    /**
+     * Add an entry synchronously to an open ledger.
+     *
+     * @param data array of bytes to be written
+     * @param offset the offset in the bytes array
+     * @param length the length of the bytes to be appended
+     * @return the entry id of this newly appended entry
+     */
+    default long append(byte[] data, int offset, int length) throws BKException, InterruptedException {
         return append(Unpooled.wrappedBuffer(data, offset, length));
     }
 
@@ -89,4 +134,27 @@
      */
     long getLastAddPushed();
 
+    /**
+     * Asynchronous close the write handle, any adds in flight will return errors.
+     *
+     * <p>Closing a ledger will ensure that all clients agree on what the last
+     * entry of the ledger is. Once the ledger has been closed, all reads from the
+     * ledger will return the same set of entries.
+     *
+     * @return an handle to access the result of the operation
+     */
+    @Override
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Synchronous close the write handle, any adds in flight will return errors.
+     *
+     * <p>Closing a ledger will ensure that all clients agree on what the last
+     * entry of the ledger is. Once the ledger has been closed, all reads from the
+     * ledger will return the same set of entries.
+     */
+    @Override
+    default void close() throws BKException, InterruptedException {
+        FutureUtils.<Void, BKException>result(closeAsync(), BKException.HANDLER);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
index 60633ebaf..d54486f0c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommand.java
@@ -67,7 +67,7 @@ protected void run(BookKeeper bk) throws Exception {
             System.out.println("Ledger ID: " + wh.getId());
             long lastReport = System.nanoTime();
             for (int i = 0; i < numEntries; i++) {
-                result(wh.append(data));
+                wh.append(data);
                 if (TimeUnit.SECONDS.convert(System.nanoTime() - lastReport,
                         TimeUnit.NANOSECONDS) > 1) {
                     System.out.println(i + " entries written");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 9a63e7081..1b061f0be 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -617,7 +617,7 @@ public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
                                     long entryId = i++;
                                     LOG.info("Writing {}:{} as {}",
                                              ledgerId, entryId, entry.slice().readInt());
-                                    lastRequest = writer.write(entryId, entry);
+                                    lastRequest = writer.writeAsync(entryId, entry);
                                 }
                                 lastRequest.join();
                                 return Pair.of(writer, entries);
@@ -1218,7 +1218,7 @@ private void readEntries(LedgerHandle lh, List<byte[]> entries) throws Interrupt
     private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws Exception {
         assertEquals("Not enough entries in ledger " + reader.getId(),
                      reader.getLastAddConfirmed(), entries.size() - 1);
-        try (LedgerEntries readEntries = reader.read(0, reader.getLastAddConfirmed()).join()) {
+        try (LedgerEntries readEntries = reader.read(0, reader.getLastAddConfirmed())) {
             int i = 0;
             for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries) {
                 int entryId = i++;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
index 2a07e1d63..0da50b57c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
@@ -53,7 +53,7 @@ public void testChangeEnsembleMaxNumWithWriter() throws Exception {
             lId = writer.getId();
             //first fragment
             for (int i = 0; i < numEntries; i++) {
-                result(writer.append(ByteBuffer.wrap(data)));
+                writer.append(ByteBuffer.wrap(data));
             }
             assertEquals("There should be zero ensemble change",
                     1, getLedgerMetadata(lId).getEnsembles().size());
@@ -65,7 +65,7 @@ public void testChangeEnsembleMaxNumWithWriter() throws Exception {
             killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
             // add failure
             try {
-                result(writer.append(ByteBuffer.wrap(data)));
+                writer.append(ByteBuffer.wrap(data));
                 fail("should not come to here");
             } catch (BKException exception){
                 assertEquals(exception.getCode(), WriteException);
@@ -82,11 +82,11 @@ private void simulateEnsembleChangeWithWriter(int changeNum, int numEntries, Wri
 
             killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
             for (int i = 0; i < numEntries; i++) {
-                result(writer.append(ByteBuffer.wrap(data)));
+                writer.append(ByteBuffer.wrap(data));
             }
             // ensure there is a ensemble changed
             assertEquals("There should be one ensemble change",
                     expectedSize + num, writer.getLedgerMetadata().getAllEnsembles().size());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index eadabfb9a..93fc4ec2b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -72,11 +72,11 @@ public void testWriteHandle() throws Exception {
                 .execute())) {
 
             // test writer is able to write
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
             assertEquals(0L, writer.getLastAddPushed());
-            result(writer.append(Unpooled.wrappedBuffer(data)));
+            writer.append(Unpooled.wrappedBuffer(data));
             assertEquals(1L, writer.getLastAddPushed());
-            long expectedEntryId = result(writer.append(ByteBuffer.wrap(data)));
+            long expectedEntryId = writer.append(ByteBuffer.wrap(data));
             assertEquals(expectedEntryId, writer.getLastAddConfirmed());
             assertEquals(3 * data.length, writer.getLength());
         }
@@ -97,9 +97,9 @@ public void testWriteAdvHandle() throws Exception {
 
             // test writer is able to write
             long entryId = 0;
-            result(writer.write(entryId++, ByteBuffer.wrap(data)));
-            result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
-            long expectedEntryId = result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            writer.write(entryId++, ByteBuffer.wrap(data));
+            writer.write(entryId++, Unpooled.wrappedBuffer(data));
+            long expectedEntryId = writer.write(entryId++, ByteBuffer.wrap(data));
             assertEquals(expectedEntryId, writer.getLastAddConfirmed());
             assertEquals(3 * data.length, writer.getLength());
         }
@@ -120,9 +120,9 @@ public void testWriteAdvHandleWithFixedLedgerId() throws Exception {
 
             // test writer is able to write
             long entryId = 0;
-            result(writer.write(entryId++, ByteBuffer.wrap(data)));
-            result(writer.write(entryId++, Unpooled.wrappedBuffer(data)));
-            long expectedEntryId = result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            writer.write(entryId++, ByteBuffer.wrap(data));
+            writer.write(entryId++, Unpooled.wrappedBuffer(data));
+            long expectedEntryId = writer.write(entryId++, ByteBuffer.wrap(data));
             assertEquals(expectedEntryId, writer.getLastAddConfirmed());
             assertEquals(3 * data.length, writer.getLength());
         }
@@ -140,9 +140,9 @@ public void testWriteAdvHandleBKDuplicateEntryId() throws Exception {
                 .execute())) {
             assertEquals(1234, writer.getId());
             long entryId = 0;
-            result(writer.write(entryId++, ByteBuffer.wrap(data)));
+            writer.write(entryId++, ByteBuffer.wrap(data));
             assertEquals(data.length, writer.getLength());
-            result(writer.write(entryId - 1, ByteBuffer.wrap(data)));
+            writer.write(entryId - 1, ByteBuffer.wrap(data));
         }
     }
 
@@ -183,7 +183,7 @@ public void testLedgerDigests() throws Exception {
                     .execute())) {
                 lId = writer.getId();
                 assertEquals(-1L, writer.getLastAddPushed());
-                result(writer.append(ByteBuffer.wrap(bigData)));
+                writer.append(ByteBuffer.wrap(bigData));
                 assertEquals(bigData.length, writer.getLength());
             }
             try (ReadHandle reader = result(newOpenLedgerOp()
@@ -191,7 +191,7 @@ public void testLedgerDigests() throws Exception {
                     .withPassword(password)
                     .withLedgerId(lId)
                     .execute())) {
-                LedgerEntries entries = result(reader.read(0, 0));
+                LedgerEntries entries = reader.read(0, 0);
                 checkEntries(entries, bigData);
             }
             result(newDeleteLedgerOp().withLedgerId(lId).execute());
@@ -250,8 +250,8 @@ public void testOpenLedgerNoSealed() throws Exception {
                 .execute())) {
             long lId = writer.getId();
             // write data and populate LastAddConfirmed
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
 
             try (ReadHandle reader = result(newOpenLedgerOp()
                     .withPassword(password)
@@ -274,9 +274,9 @@ public void testOpenLedgerRead() throws Exception {
                 .execute())) {
             lId = writer.getId();
             // write data and populate LastAddConfirmed
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
         }
 
         try (ReadHandle reader = result(newOpenLedgerOp()
@@ -287,14 +287,14 @@ public void testOpenLedgerRead() throws Exception {
             assertTrue(reader.isClosed());
             assertEquals(2, reader.getLastAddConfirmed());
             assertEquals(3 * data.length, reader.getLength());
-            assertEquals(2, result(reader.readLastAddConfirmed()).intValue());
-            assertEquals(2, result(reader.tryReadLastAddConfirmed()).intValue());
-            checkEntries(result(reader.read(0, reader.getLastAddConfirmed())), data);
-            checkEntries(result(reader.readUnconfirmed(0, reader.getLastAddConfirmed())), data);
+            assertEquals(2, reader.readLastAddConfirmed());
+            assertEquals(2, reader.tryReadLastAddConfirmed());
+            checkEntries(reader.read(0, reader.getLastAddConfirmed()), data);
+            checkEntries(reader.readUnconfirmed(0, reader.getLastAddConfirmed()), data);
 
             // test readLastAddConfirmedAndEntry
             LastConfirmedAndEntry lastConfirmedAndEntry =
-                result(reader.readLastAddConfirmedAndEntry(0, 999, false));
+                reader.readLastAddConfirmedAndEntry(0, 999, false);
             assertEquals(2L, lastConfirmedAndEntry.getLastAddConfirmed());
             assertArrayEquals(data, lastConfirmedAndEntry.getEntry().getEntryBytes());
             lastConfirmedAndEntry.close();
@@ -320,8 +320,8 @@ public void testOpenLedgerWithRecovery() throws Exception {
             .execute())) {
             lId = writer.getId();
 
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
             assertEquals(1L, writer.getLastAddPushed());
 
             // open with fencing
@@ -334,7 +334,7 @@ public void testOpenLedgerWithRecovery() throws Exception {
                 assertEquals(1L, reader.getLastAddConfirmed());
             }
 
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
 
         }
     }
@@ -383,9 +383,9 @@ public void testLedgerEntriesIterable() throws Exception {
                 .execute().get()) {
             lId = writer.getId();
             // write data and populate LastAddConfirmed
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
-            result(writer.append(ByteBuffer.wrap(data)));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
+            writer.append(ByteBuffer.wrap(data));
         }
 
         try (ReadHandle reader = newOpenLedgerOp()
@@ -396,7 +396,7 @@ public void testLedgerEntriesIterable() throws Exception {
             long lac = reader.getLastAddConfirmed();
             assertEquals(2, lac);
 
-            try (LedgerEntries entries = reader.read(0, lac).get()) {
+            try (LedgerEntries entries = reader.read(0, lac)) {
                 AtomicLong i = new AtomicLong(0);
                 for (LedgerEntry e : entries) {
                     assertEquals(i.getAndIncrement(), e.getEntryId());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
index 24408d98c..585a7b21e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteAdvHandleTest.java
@@ -59,47 +59,47 @@ public WriteAdvHandleTest() {
             ByteBuf buf = invocationOnMock.getArgument(1);
             entryQueue.add(buf);
             return FutureUtils.value(-1L);
-        }).when(handle).write(anyLong(), any(ByteBuf.class));
-        when(handle.write(anyLong(), any(byte[].class))).thenCallRealMethod();
-        when(handle.write(anyLong(), any(byte[].class), anyInt(), anyInt())).thenCallRealMethod();
-        when(handle.write(anyLong(), any(ByteBuffer.class))).thenCallRealMethod();
+        }).when(handle).writeAsync(anyLong(), any(ByteBuf.class));
+        when(handle.writeAsync(anyLong(), any(byte[].class))).thenCallRealMethod();
+        when(handle.writeAsync(anyLong(), any(byte[].class), anyInt(), anyInt())).thenCallRealMethod();
+        when(handle.writeAsync(anyLong(), any(ByteBuffer.class))).thenCallRealMethod();
     }
 
     @Test
     public void testAppendBytes() throws Exception {
         byte[] testData = runtime.getMethodName().getBytes(UTF_8);
-        handle.write(entryId, testData);
+        handle.writeAsync(entryId, testData);
 
         ByteBuf buffer = entryQueue.take();
         byte[] bufferData = ByteBufUtil.getBytes(buffer);
         assertArrayEquals(testData, bufferData);
-        verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+        verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
     }
 
     @Test
     public void testAppendBytes2() throws Exception {
         byte[] testData = runtime.getMethodName().getBytes(UTF_8);
-        handle.write(entryId, testData, 1, testData.length / 2);
+        handle.writeAsync(entryId, testData, 1, testData.length / 2);
         byte[] expectedData = new byte[testData.length / 2];
         System.arraycopy(testData, 1, expectedData, 0, testData.length / 2);
 
         ByteBuf buffer = entryQueue.take();
         byte[] bufferData = ByteBufUtil.getBytes(buffer);
         assertArrayEquals(expectedData, bufferData);
-        verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+        verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
     }
 
     @Test
     public void testAppendByteBuffer() throws Exception {
         byte[] testData = runtime.getMethodName().getBytes(UTF_8);
-        handle.write(entryId, ByteBuffer.wrap(testData, 1, testData.length / 2));
+        handle.writeAsync(entryId, ByteBuffer.wrap(testData, 1, testData.length / 2));
         byte[] expectedData = new byte[testData.length / 2];
         System.arraycopy(testData, 1, expectedData, 0, testData.length / 2);
 
         ByteBuf buffer = entryQueue.take();
         byte[] bufferData = ByteBufUtil.getBytes(buffer);
         assertArrayEquals(expectedData, bufferData);
-        verify(handle, times(1)).write(eq(entryId), any(ByteBuf.class));
+        verify(handle, times(1)).writeAsync(eq(entryId), any(ByteBuf.class));
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
index fd7ac5e4e..113f58576 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/WriteHandleTest.java
@@ -34,7 +34,6 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -51,12 +50,12 @@
     private final WriteHandle handle = mock(WriteHandle.class);
     private final LinkedBlockingQueue<ByteBuf> entryQueue;
 
-    public WriteHandleTest() {
+    public WriteHandleTest() throws Exception {
         this.entryQueue = new LinkedBlockingQueue<>();
         doAnswer(invocationOnMock -> {
             ByteBuf buf = invocationOnMock.getArgument(0);
             entryQueue.add(buf);
-            return FutureUtils.value(-1L);
+            return -1L;
         }).when(handle).append(any(ByteBuf.class));
         when(handle.append(any(byte[].class))).thenCallRealMethod();
         when(handle.append(any(byte[].class), anyInt(), anyInt())).thenCallRealMethod();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index c3f1b8938..26b244820 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -65,7 +65,7 @@ public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Except
         long lac = rh.getLastAddConfirmed();
         while (lac < newLac) {
             TimeUnit.MILLISECONDS.sleep(20);
-            lac = rh.readLastAddConfirmed().get();
+            lac = rh.readLastAddConfirmed();
         }
     }
 
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
index 946afc552..a901f2182 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/SimpleTestCommandTest.java
@@ -41,7 +41,7 @@
 public class SimpleTestCommandTest extends ClientCommandTestBase {
 
     @Test
-    public void testCommandShortArgs() {
+    public void testCommandShortArgs() throws Exception {
         testCommand(
             "simpletest",
             "-e", "5",
@@ -51,7 +51,7 @@ public void testCommandShortArgs() {
     }
 
     @Test
-    public void testCommandLongArgs() {
+    public void testCommandLongArgs() throws Exception {
         testCommand(
             "simpletest",
             "--ensemble-size", "5",
@@ -60,10 +60,10 @@ public void testCommandLongArgs() {
             "--num-entries", "10");
     }
 
-    public void testCommand(String... args) {
+    public void testCommand(String... args) throws Exception {
         WriteHandle wh = mock(WriteHandle.class);
         AtomicLong counter = new AtomicLong(0L);
-        when(wh.append(any(byte[].class))).thenReturn(FutureUtils.value(counter.get()));
+        when(wh.append(any(byte[].class))).thenReturn(counter.get());
         CreateBuilder createBuilder = mock(CreateBuilder.class);
         when(createBuilder.execute())
             .thenReturn(FutureUtils.value(wh));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services