You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/02/18 08:44:44 UTC

[bookkeeper] branch master updated: ISSUE #966: Expose quorum write complete latency to the client

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f17b422  ISSUE #966: Expose quorum write complete latency to the client
f17b422 is described below

commit f17b422f2474e436ab7722dca16b778aa626f8ca
Author: JV <vj...@salesforce.com>
AuthorDate: Sun Feb 18 00:44:32 2018 -0800

    ISSUE #966: Expose quorum write complete latency to the client
    
    Add quorum write complete latency to the bookkeeper client API. This
    allows callers to find out exactly how long the quorum write took for
    diagnostic purposes.
    
    (bug W-3616193)
    Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
    [Fixed merge conflicts, added testing]
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: JV <vj...@salesforce.com>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #970 from athanatos/forupstream/stats1/qlatency, closes #966
---
 .../apache/bookkeeper/client/AsyncCallback.java    | 46 +++++++++++++++++++++-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 42 ++++++++++++++++----
 .../apache/bookkeeper/client/LedgerHandleAdv.java  | 42 ++++++++++++++++----
 .../org/apache/bookkeeper/client/PendingAddOp.java | 11 ++++--
 ...BookieWriteLedgersWithDifferentDigestsTest.java |  9 +++--
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 19 +++++++++
 6 files changed, 144 insertions(+), 25 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index 7e19f44..cde3f06 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -26,6 +26,30 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface AsyncCallback {
+    /**
+     * Async Callback for adding entries to ledgers with latency information.
+     *
+     * @since 4.7
+     */
+    @InterfaceAudience.Public
+    @InterfaceStability.Evolving
+    interface AddCallbackWithLatency {
+        /**
+         * Callback declaration which additionally passes quorum write complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx);
+    }
 
     /**
      * Async Callback for adding entries to ledgers.
@@ -34,9 +58,9 @@ public interface AsyncCallback {
      */
     @InterfaceAudience.Public
     @InterfaceStability.Stable
-    interface AddCallback {
+    interface AddCallback extends AddCallbackWithLatency {
         /**
-         * Callback declaration.
+         * Callback to implement if latency information is not desired.
          *
          * @param rc
          *          return code
@@ -48,6 +72,24 @@ public interface AsyncCallback {
          *          context object
          */
         void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+
+        /**
+         * Callback declaration which additionally passes quorum write complete latency.
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param entryId
+         *          entry identifier
+         * @param qwcLatency
+         *          QuorumWriteComplete Latency
+         * @param ctx
+         *          context object
+         */
+        default void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx) {
+            addComplete(rc, lh, entryId, ctx);
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 007f4ce..7047b37 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
@@ -943,7 +944,7 @@ public class LedgerHandle implements WriteHandle {
      */
     public void asyncAddEntry(final long entryId, final byte[] data, final AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -1003,7 +1004,32 @@ public class LedgerHandle implements WriteHandle {
     public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
             final AddCallback cb, final Object ctx) {
         LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
-        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to a
+     *             value higher than the length of data.
+     */
+    public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
     }
 
     /**
@@ -1050,8 +1076,8 @@ public class LedgerHandle implements WriteHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-                        op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                        op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
                     }
 
                     @Override
@@ -1060,8 +1086,8 @@ public class LedgerHandle implements WriteHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
             }
             return;
         }
@@ -1069,8 +1095,8 @@ public class LedgerHandle implements WriteHandle {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                    LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
+            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                    LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0866db7..8058103 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
@@ -151,8 +152,33 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
         asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to a
+     *             value higher than the length of data.
+     */
+    @Override
+    public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
     private void asyncAddEntry(final long entryId, ByteBuf data,
-            final AddCallback cb, final Object ctx) {
+            final AddCallbackWithLatency cb, final Object ctx) {
         PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
         op.setEntryId(entryId);
 
@@ -196,8 +222,8 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-                        op.cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                        op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
+                                LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
                     }
                     @Override
                     public String toString() {
@@ -205,8 +231,8 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
             }
             return;
         }
@@ -214,8 +240,8 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
         try {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                              LedgerHandleAdv.this, op.getEntryId(), op.ctx);
+            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                              LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
         }
     }
 
@@ -231,7 +257,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
      */
     @Override
     public void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx) {
-        cb.addComplete(BKException.Code.IllegalOpException, this, LedgerHandle.INVALID_ENTRY_ID, ctx);
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, this, LedgerHandle.INVALID_ENTRY_ID, 0, ctx);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index e386701..8d65c00 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -54,7 +54,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
 
     ByteBuf payload;
     ByteBuf toSend;
-    AddCallback cb;
+    AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
     int entryLength;
@@ -65,6 +65,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
+    long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
 
     long timeoutNanos;
 
@@ -74,7 +75,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
     boolean callbackTriggered;
     boolean hasRun;
 
-    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback cb, Object ctx) {
+    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
         op.isRecoveryAdd = false;
@@ -93,6 +94,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         op.callbackTriggered = false;
         op.hasRun = false;
         op.requestTimeNanos = Long.MAX_VALUE;
+        op.qwcLatency = 0;
         return op;
     }
 
@@ -322,6 +324,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
 
         if (ackQuorum && !completed) {
             completed = true;
+            this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
 
             sendAddSuccessCallbacks();
         }
@@ -344,7 +347,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         } else {
             addOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
         }
-        cb.addComplete(rc, lh, entryId, ctx);
+        cb.addCompleteWithLatency(rc, lh, entryId, qwcLatency, ctx);
         callbackTriggered = true;
 
         maybeRecycle();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
index daa434c..9ab461a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookieException.Code.OK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,7 +32,6 @@ import java.util.Collection;
 import java.util.Enumeration;
 import java.util.Random;
 
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Before;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(Parameterized.class)
 public class BookieWriteLedgersWithDifferentDigestsTest extends
-    BookKeeperClusterTestCase implements AddCallback {
+    BookKeeperClusterTestCase implements AsyncCallback.AddCallbackWithLatency {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(BookieWriteLedgersWithDifferentDigestsTest.class);
@@ -192,8 +192,11 @@ public class BookieWriteLedgersWithDifferentDigestsTest extends
     }
 
     @Override
-    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+    public void addCompleteWithLatency(int rc, LedgerHandle lh, long entryId, long qwcLatency, Object ctx) {
         SyncObj x = (SyncObj) ctx;
+        captureThrowable(() -> {
+            assertTrue("Successful write should have non-zero latency", rc != OK || qwcLatency > 0);
+        });
         synchronized (x) {
             x.rc = rc;
             x.counter++;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 6365c15..b965d15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.test;
 
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Stopwatch;
 import java.io.File;
 import java.io.IOException;
@@ -32,6 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -97,6 +100,16 @@ public abstract class BookKeeperClusterTestCase {
 
     private boolean isAutoRecoveryEnabled;
 
+    SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
+    protected void captureThrowable(Runnable c) {
+        try {
+            c.run();
+        } catch (Throwable e) {
+            LOG.error("Captured error: {}", e);
+            asyncExceptions.add(e);
+        }
+    }
+
     public BookKeeperClusterTestCase(int numBookies) {
         this(numBookies, 120);
     }
@@ -128,6 +141,12 @@ public abstract class BookKeeperClusterTestCase {
 
     @After
     public void tearDown() throws Exception {
+        boolean failed = false;
+        for (Throwable e : asyncExceptions) {
+            LOG.error("Got async exception: {}", e);
+            failed = true;
+        }
+        assertTrue("Async failure", !failed);
         Stopwatch sw = Stopwatch.createStarted();
         LOG.info("TearDown");
         Exception tearDownException = null;

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.