You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/18 08:44:50 UTC

[GitHub] sijie closed pull request #970: ISSUE #966: Expose quorum write complete latency to the client

sijie closed pull request #970: ISSUE #966: Expose quorum write complete latency to the client
URL: https://github.com/apache/bookkeeper/pull/970
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index 7e19f4482..cde3f06ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -26,6 +26,30 @@
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface AsyncCallback {
+    /**
+     * Async Callback for adding entries to ledgers with latency information.
+     *
+     * @since 4.7
+     */
+    @InterfaceAudience.Public
+    @InterfaceStability.Evolving
+    interface AddCallbackWithLatency {
+        /**
+         * Callback declaration which additionally passes quorum write complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx);
+    }
 
     /**
      * Async Callback for adding entries to ledgers.
@@ -34,9 +58,9 @@
      */
     @InterfaceAudience.Public
     @InterfaceStability.Stable
-    interface AddCallback {
+    interface AddCallback extends AddCallbackWithLatency {
         /**
-         * Callback declaration.
+         * Callback to implement if latency information is not desired.
          *
          * @param rc
          *          return code
@@ -48,6 +72,24 @@
          *          context object
          */
         void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+
+        /**
+         * Callback declaration which additionally passes quorum write complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        default void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx) {
+            addComplete(rc, lh, entryId, ctx);
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index ba6b3fcf8..54cdc5d54 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -51,6 +51,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
@@ -908,7 +909,7 @@ public void asyncAddEntry(final byte[] data, final AddCallback cb,
      */
     public void asyncAddEntry(final long entryId, final byte[] data, final AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -968,7 +969,32 @@ public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx)
     public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
             final AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to a
+     *             value higher than the length of data.
+     */
+    public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -1015,8 +1041,8 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-                        op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                        op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
                     }
 
                     @Override
@@ -1025,8 +1051,8 @@ public String toString() {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
             }
             return;
         }
@@ -1034,8 +1060,8 @@ public String toString() {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                    LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                    LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0866db7f4..80581032b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
@@ -151,8 +152,33 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
         asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to a
+     *             value higher than the length of data.
+     */
+    @Override
+    public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
     private void asyncAddEntry(final long entryId, ByteBuf data,
-            final AddCallback cb, final Object ctx) {
+            final AddCallbackWithLatency cb, final Object ctx) {
         PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
         op.setEntryId(entryId);
 
@@ -196,8 +222,8 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-                        op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                        op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
                     }
                     @Override
                     public String toString() {
@@ -205,8 +231,8 @@ public String toString() {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
             }
             return;
         }
@@ -214,8 +240,8 @@ public String toString() {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                              LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                              LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
         }
     }
 
@@ -231,7 +257,7 @@ public String toString() {
      */
     @Override
     public void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx) {
-        cb.addComplete(BKException.Code.IllegalOpException, this, LedgerHandle.INVALID_ENTRY_ID, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, this, LedgerHandle.INVALID_ENTRY_ID, 0, ctx);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 77f05c15a..0a1ac76d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,7 +30,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -54,7 +54,7 @@
 
     ByteBuf payload;
     ByteBuf toSend;
-    AddCallback cb;
+    AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
     int entryLength;
@@ -65,6 +65,7 @@
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
+    long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
 
     long timeoutNanos;
 
@@ -74,7 +75,7 @@
     boolean callbackTriggered;
     boolean hasRun;
 
-    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback cb, Object ctx) {
+    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
         op.isRecoveryAdd = false;
@@ -93,6 +94,7 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback cb, Obj
         op.callbackTriggered = false;
         op.hasRun = false;
         op.requestTimeNanos = Long.MAX_VALUE;
+        op.qwcLatency = 0;
         return op;
     }
 
@@ -322,6 +324,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
 
         if (ackQuorum && !completed) {
             completed = true;
+            this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
 
             sendAddSuccessCallbacks();
         }
@@ -344,7 +347,7 @@ void submitCallback(final int rc) {
         } else {
             addOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
         }
-        cb.addComplete(rc, lh, entryId, ctx);
+        cb.addCompleteWithLatency(rc, lh, entryId, qwcLatency, ctx);
         callbackTriggered = true;
 
         maybeRecycle();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
index daa434c01..9ab461a9b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookieException.Code.OK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,7 +32,6 @@
 import java.util.Enumeration;
 import java.util.Random;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Before;
@@ -48,7 +48,7 @@
  */
 @RunWith(Parameterized.class)
 public class BookieWriteLedgersWithDifferentDigestsTest extends
-    BookKeeperClusterTestCase implements AddCallback {
+    BookKeeperClusterTestCase implements AsyncCallback.AddCallbackWithLatency {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(BookieWriteLedgersWithDifferentDigestsTest.class);
@@ -192,8 +192,11 @@ private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws Inte
     }
 
     @Override
-    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+    public void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx) {
         SyncObj x = (SyncObj) ctx;
+        captureThrowable(() -> {
+            assertTrue("Successful write should have non-zero latency", rc != OK || qwcLatency > 0);
+        });
         synchronized (x) {
             x.rc = rc;
             x.counter++;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 526a4c7b1..9d9ea1b2d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.test;
 
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Stopwatch;
 import java.io.File;
 import java.io.IOException;
@@ -32,6 +34,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -92,6 +95,16 @@
 
     private boolean isAutoRecoveryEnabled;
 
+    SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
+    protected void captureThrowable(Runnable c) {
+        try {
+            c.run();
+        } catch (Throwable e) {
+            LOG.error("Captured error: {}", e);
+            asyncExceptions.add(e);
+        }
+    }
+
     public BookKeeperClusterTestCase(int numBookies) {
         this(numBookies, 120);
     }
@@ -124,6 +137,12 @@ public void setUp() throws Exception {
 
     @After
     public void tearDown() throws Exception {
+        boolean failed = false;
+        for (Throwable e : asyncExceptions) {
+            LOG.error("Got async exception: {}", e);
+            failed = true;
+        }
+        assertTrue("Async failure", !failed);
         Stopwatch sw = Stopwatch.createStarted();
         LOG.info("TearDown");
         Exception tearDownException = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services