You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/24 04:37:08 UTC

[pulsar] branch master updated: [pulsar-broker] schedule one add/read timeout task per ml/topic (#4111)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e96a9c  [pulsar-broker] schedule one add/read timeout task per ml/topic (#4111)
5e96a9c is described below

commit 5e96a9c99a46e4c52a339ba681157f9a3c4b2096
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 23 21:37:03 2019 -0700

    [pulsar-broker] schedule one add/read timeout task per ml/topic (#4111)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 160 ++++++++++++---------
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   3 -
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  32 +++--
 3 files changed, 116 insertions(+), 79 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index eea6c1f..c777b60 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -230,10 +230,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     final ManagedLedgerFactoryImpl factory;
     protected final ManagedLedgerMBeanImpl mbean;
     protected final Clock clock;
+    
     private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ManagedLedgerImpl.class, "readOpCount");
     private volatile long readOpCount = 0;
-
+    // last read-operation's callback to check read-timeout on it. 
+    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
+    private volatile ReadEntryCallbackWrapper lastReadCallback = null;
+    
     /**
      * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
      * created asynchronously and hence there is no ready ledger to write into.
@@ -338,26 +343,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         scheduleTimeoutTask();
     }
 
-    private void scheduleTimeoutTask() {
-        long timeoutSec = config.getAddEntryTimeoutSeconds();
-        // disable timeout task checker if timeout <= 0
-        if (timeoutSec > 0) {
-            this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> {
-                OpAddEntry opAddEntry = pendingAddEntries.peek();
-                if (opAddEntry != null) {
-                    boolean isTimedOut = opAddEntry.lastInitTime != -1
-                            && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
-                            && opAddEntry.completed == FALSE;
-                    if (isTimedOut) {
-                        log.error("Failed to add entry for ledger {} in time-out {} sec",
-                                (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
-                        opAddEntry.handleAddFailure(opAddEntry.ledger);
-                    }
-                }
-            }, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
-        }
-    }
-
     private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
@@ -1216,6 +1201,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (this.timeoutTask != null) {
             this.timeoutTask.cancel(false);
         }
+        
     }
 
     private void closeAllCursors(CloseCallback callback, final Object ctx) {
@@ -1597,24 +1583,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
-        long timeout = config.getReadEntryTimeoutSeconds();
-        boolean checkTimeout = timeout > 0;
-        if (checkTimeout) {
+        if (config.getReadEntryTimeoutSeconds() > 0) {
             // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
             long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+            long createdTime = System.nanoTime();
             ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
-                    position.getEntryId(), callback, readOpCount, ctx);
-            final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
-                // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
-                // readOpCount) and fail the callback if read is not completed yet
-                if (readCallback.readOpCount == readOpCount
-                        && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
-                    log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, ledger.getId(), position,
-                            timeout);
-                    readCallback.readEntryFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
-                }
-            }, timeout, TimeUnit.SECONDS);
-            readCallback.task = task;
+                    position.getEntryId(), callback, readOpCount, createdTime, ctx);
+            LAST_READ_CALLBACK.set(this, readCallback);
             entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
         } else {
             entryCache.asyncReadEntry(ledger, position, callback, ctx);
@@ -1623,24 +1598,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
             OpReadEntry opReadEntry, Object ctx) {
-        long timeout = config.getReadEntryTimeoutSeconds();
-        boolean checkTimeout = timeout > 0;
-        if (checkTimeout) {
+        if (config.getReadEntryTimeoutSeconds() > 0) {
             // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
             long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+            long createdTime = System.nanoTime();
             ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
-                    opReadEntry, readOpCount, ctx);
-            final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
-                // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
-                // readOpCount) and fail the callback if read is not completed yet
-                if (readCallback.readOpCount == readOpCount
-                        && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
-                    log.warn("[{}]-{} read entry timeout for {}-{} after {} sec", this.name, ledger.getId(), firstEntry,
-                            lastEntry, timeout);
-                    readCallback.readEntriesFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
-                }
-            }, timeout, TimeUnit.SECONDS);
-            readCallback.task = task;
+                    opReadEntry, readOpCount, createdTime, ctx);
+            LAST_READ_CALLBACK.set(this, readCallback);
             entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
         } else {
             entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
@@ -1658,8 +1622,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         String name;
         long ledgerId;
         long entryId;
-        ScheduledFuture<?> task;
         volatile long readOpCount = -1;
+        volatile long createdTime = -1;
         volatile Object cntx;
 
         final Handle<ReadEntryCallbackWrapper> recyclerHandle;
@@ -1668,7 +1632,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback, long readOpCount, Object ctx) {
+        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback,
+                long readOpCount, long createdTime, Object ctx) {
             ReadEntryCallbackWrapper readCallback = RECYCLER.get();
             readCallback.name = name;
             readCallback.ledgerId = ledgerId;
@@ -1676,10 +1641,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             readCallback.readEntryCallback = callback;
             readCallback.cntx = ctx;
             readCallback.readOpCount = readOpCount;
+            readCallback.createdTime = createdTime;
             return readCallback;
         }
 
-        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback, long readOpCount, Object ctx) {
+        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback,
+                long readOpCount, long createdTime, Object ctx) {
             ReadEntryCallbackWrapper readCallback = RECYCLER.get();
             readCallback.name = name;
             readCallback.ledgerId = ledgerId;
@@ -1687,13 +1654,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             readCallback.readEntriesCallback = callback;
             readCallback.cntx = ctx;
             readCallback.readOpCount = readOpCount;
+            readCallback.createdTime = createdTime;
             return readCallback;
         }
 
+        public boolean isTimedOut(long timeoutSec) {
+            return this.createdTime != -1
+                    && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.createdTime) >= timeoutSec
+                    && this.readCompleted == FALSE;
+        }
+
         @Override
         public void readEntryComplete(Entry entry, Object ctx) {
             if (checkCallbackCompleted(ctx)) {
-                log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                }
                 entry.release();
                 return;
             }
@@ -1704,7 +1680,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         @Override
         public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
             if (checkCallbackCompleted(ctx)) {
-                log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                }
                 return;
             }
             readEntryCallback.readEntryFailed(exception, cntx);
@@ -1714,7 +1692,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         @Override
         public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
             if (checkCallbackCompleted(ctx)) {
-                log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                }
                 returnedEntries.forEach(Entry::release);
                 return;
             }
@@ -1725,13 +1705,26 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             if (checkCallbackCompleted(ctx)) {
-                log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                }
                 return;
             }
             readEntriesCallback.readEntriesFailed(exception, cntx);
             recycle();
         }
 
+        public void readFailed(ManagedLedgerException exception, Object ctx) {
+            if (readEntryCallback != null) {
+                readEntryFailed(exception, ctx);
+            } else if (readEntriesCallback != null) {
+                readEntriesFailed(exception, ctx);
+            } else {
+                // it should not happen .. recycle if none of the callback exists..
+                recycle();
+            }
+        }
+        
         private boolean checkCallbackCompleted(Object ctx) {
             // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
             // assigned to different request
@@ -1742,13 +1735,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         private void recycle() {
             readOpCount = -1;
-            if (task != null && !task.isDone() && !task.isCancelled()) {
-                try {
-                    task.cancel(false);
-                } catch (Throwable th) {
-                    log.debug("[{}]Failed to cancle task for read-callback for {}-{}", name, ledgerId, entryId);
-                }
-            }
+            createdTime = -1;
             readEntryCallback = null;
             readEntriesCallback = null;
             ledgerId = -1;
@@ -3104,6 +3091,49 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return false;
     }
 
+    private void scheduleTimeoutTask() {
+        // disable timeout task checker if timeout <= 0
+        if (config.getAddEntryTimeoutSeconds() > 0 || config.getReadEntryTimeoutSeconds() > 0) {
+            long timeoutSec = Math.min(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds());
+            this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
+                checkAddTimeout();
+                checkReadTimeout();
+            }), timeoutSec, timeoutSec, TimeUnit.SECONDS);
+        }
+    }
+
+    private void checkAddTimeout() {
+        long timeoutSec = config.getAddEntryTimeoutSeconds();
+        if (timeoutSec < 1) {
+            return;
+        }
+        OpAddEntry opAddEntry = pendingAddEntries.peek();
+        if (opAddEntry != null) {
+            boolean isTimedOut = opAddEntry.lastInitTime != -1
+                    && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
+                    && opAddEntry.completed == FALSE;
+            if (isTimedOut) {
+                log.error("Failed to add entry for ledger {} in time-out {} sec",
+                        (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
+                opAddEntry.handleAddFailure(opAddEntry.ledger);
+            }
+        }
+    }
+
+    private void checkReadTimeout() {
+        long timeoutSec = config.getReadEntryTimeoutSeconds();
+        if (timeoutSec < 1) {
+            return;
+        }
+        ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
+        if (callback != null && callback.isTimedOut(timeoutSec)) {
+            log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
+                    timeoutSec);
+            callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
+            LAST_READ_CALLBACK.set(this, null);
+        }
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index b02b0d8..dbe5d25 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -34,8 +33,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 class OpReadEntry implements ReadEntriesCallback {
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 497ae9e..d0abbbb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2277,8 +2277,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         BookKeeper bk = mock(BookKeeper.class);
         doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
         AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
-        CountDownLatch latch1 = new CountDownLatch(1);
-
+        String ctxStr = "timeoutCtx";
         CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
         ReadHandle ledgerHandle = mock(ReadHandle.class);
         doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
@@ -2289,27 +2288,27 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 responseException1.set(null);
-                latch1.countDown();
             }
 
             @Override
             public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                assertEquals(ctxStr, (String) ctx);
                 responseException1.set(exception);
-                latch1.countDown();
             }
-        }, null);
+        }, ctxStr);
         ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
             @Override
             public void createComplete(int rc, LedgerHandle lh, Object ctx) {
 
             }
         }, Collections.emptyMap());
-        latch1.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+        retryStrategically((test) -> {
+            return responseException1.get() != null;
+        }, 5, 1000);
         assertNotNull(responseException1.get());
         assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
 
         // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
-        CountDownLatch latch2 = new CountDownLatch(1);
         AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
         PositionImpl readPositionRef = PositionImpl.earliest;
         ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
@@ -2317,19 +2316,20 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
             @Override
             public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                latch2.countDown();
             }
 
             @Override
             public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                assertEquals(ctxStr, (String) ctx);
                 responseException2.set(exception);
-                latch2.countDown();
             }
 
         }, null);
         ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
-                false, opReadEntry, null);
-        latch2.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+                false, opReadEntry, ctxStr);
+        retryStrategically((test) -> {
+            return responseException2.get() != null;
+        }, 5, 1000);
         assertNotNull(responseException2.get());
         assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
 
@@ -2405,4 +2405,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         field.setAccessible(true);
         field.set(classObj, fieldValue);
     }
+    
+    public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+            throws Exception {
+        for (int i = 0; i < retryCount; i++) {
+            if (predicate.test(null) || i == (retryCount - 1)) {
+                break;
+            }
+            Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+        }
+    }
 }