You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/24 04:37:08 UTC
[pulsar] branch master updated: [pulsar-broker] schedule one
add/read timeout task per ml/topic (#4111)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e96a9c [pulsar-broker] schedule one add/read timeout task per ml/topic (#4111)
5e96a9c is described below
commit 5e96a9c99a46e4c52a339ba681157f9a3c4b2096
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 23 21:37:03 2019 -0700
[pulsar-broker] schedule one add/read timeout task per ml/topic (#4111)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 160 ++++++++++++---------
.../bookkeeper/mledger/impl/OpReadEntry.java | 3 -
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 +++--
3 files changed, 116 insertions(+), 79 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index eea6c1f..c777b60 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -230,10 +230,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;
+
private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
-
+ // last read-operation's callback to check read-timeout on it.
+ private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
+ .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
+ private volatile ReadEntryCallbackWrapper lastReadCallback = null;
+
/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
@@ -338,26 +343,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
scheduleTimeoutTask();
}
- private void scheduleTimeoutTask() {
- long timeoutSec = config.getAddEntryTimeoutSeconds();
- // disable timeout task checker if timeout <= 0
- if (timeoutSec > 0) {
- this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> {
- OpAddEntry opAddEntry = pendingAddEntries.peek();
- if (opAddEntry != null) {
- boolean isTimedOut = opAddEntry.lastInitTime != -1
- && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
- && opAddEntry.completed == FALSE;
- if (isTimedOut) {
- log.error("Failed to add entry for ledger {} in time-out {} sec",
- (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
- opAddEntry.handleAddFailure(opAddEntry.ledger);
- }
- }
- }, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
- }
- }
-
private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
@@ -1216,6 +1201,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}
+
}
private void closeAllCursors(CloseCallback callback, final Object ctx) {
@@ -1597,24 +1583,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
- long timeout = config.getReadEntryTimeoutSeconds();
- boolean checkTimeout = timeout > 0;
- if (checkTimeout) {
+ if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+ long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
- position.getEntryId(), callback, readOpCount, ctx);
- final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
- // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
- // readOpCount) and fail the callback if read is not completed yet
- if (readCallback.readOpCount == readOpCount
- && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
- log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, ledger.getId(), position,
- timeout);
- readCallback.readEntryFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
- }
- }, timeout, TimeUnit.SECONDS);
- readCallback.task = task;
+ position.getEntryId(), callback, readOpCount, createdTime, ctx);
+ LAST_READ_CALLBACK.set(this, readCallback);
entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
@@ -1623,24 +1598,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
OpReadEntry opReadEntry, Object ctx) {
- long timeout = config.getReadEntryTimeoutSeconds();
- boolean checkTimeout = timeout > 0;
- if (checkTimeout) {
+ if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+ long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
- opReadEntry, readOpCount, ctx);
- final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
- // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
- // readOpCount) and fail the callback if read is not completed yet
- if (readCallback.readOpCount == readOpCount
- && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
- log.warn("[{}]-{} read entry timeout for {}-{} after {} sec", this.name, ledger.getId(), firstEntry,
- lastEntry, timeout);
- readCallback.readEntriesFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
- }
- }, timeout, TimeUnit.SECONDS);
- readCallback.task = task;
+ opReadEntry, readOpCount, createdTime, ctx);
+ LAST_READ_CALLBACK.set(this, readCallback);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
@@ -1658,8 +1622,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
String name;
long ledgerId;
long entryId;
- ScheduledFuture<?> task;
volatile long readOpCount = -1;
+ volatile long createdTime = -1;
volatile Object cntx;
final Handle<ReadEntryCallbackWrapper> recyclerHandle;
@@ -1668,7 +1632,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
this.recyclerHandle = recyclerHandle;
}
- static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback, long readOpCount, Object ctx) {
+ static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback,
+ long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
@@ -1676,10 +1641,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
readCallback.readEntryCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
+ readCallback.createdTime = createdTime;
return readCallback;
}
- static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback, long readOpCount, Object ctx) {
+ static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback,
+ long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
@@ -1687,13 +1654,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
readCallback.readEntriesCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
+ readCallback.createdTime = createdTime;
return readCallback;
}
+ public boolean isTimedOut(long timeoutSec) {
+ return this.createdTime != -1
+ && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.createdTime) >= timeoutSec
+ && this.readCompleted == FALSE;
+ }
+
@Override
public void readEntryComplete(Entry entry, Object ctx) {
if (checkCallbackCompleted(ctx)) {
- log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ }
entry.release();
return;
}
@@ -1704,7 +1680,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
if (checkCallbackCompleted(ctx)) {
- log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ }
return;
}
readEntryCallback.readEntryFailed(exception, cntx);
@@ -1714,7 +1692,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
if (checkCallbackCompleted(ctx)) {
- log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ }
returnedEntries.forEach(Entry::release);
return;
}
@@ -1725,13 +1705,26 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (checkCallbackCompleted(ctx)) {
- log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+ }
return;
}
readEntriesCallback.readEntriesFailed(exception, cntx);
recycle();
}
+ public void readFailed(ManagedLedgerException exception, Object ctx) {
+ if (readEntryCallback != null) {
+ readEntryFailed(exception, ctx);
+ } else if (readEntriesCallback != null) {
+ readEntriesFailed(exception, ctx);
+ } else {
+ // it should not happen .. recycle if none of the callback exists..
+ recycle();
+ }
+ }
+
private boolean checkCallbackCompleted(Object ctx) {
// if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
// assigned to different request
@@ -1742,13 +1735,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void recycle() {
readOpCount = -1;
- if (task != null && !task.isDone() && !task.isCancelled()) {
- try {
- task.cancel(false);
- } catch (Throwable th) {
- log.debug("[{}]Failed to cancle task for read-callback for {}-{}", name, ledgerId, entryId);
- }
- }
+ createdTime = -1;
readEntryCallback = null;
readEntriesCallback = null;
ledgerId = -1;
@@ -3104,6 +3091,49 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return false;
}
+ private void scheduleTimeoutTask() {
+ // disable timeout task checker if timeout <= 0
+ if (config.getAddEntryTimeoutSeconds() > 0 || config.getReadEntryTimeoutSeconds() > 0) {
+ long timeoutSec = Math.min(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds());
+ this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
+ checkAddTimeout();
+ checkReadTimeout();
+ }), timeoutSec, timeoutSec, TimeUnit.SECONDS);
+ }
+ }
+
+ private void checkAddTimeout() {
+ long timeoutSec = config.getAddEntryTimeoutSeconds();
+ if (timeoutSec < 1) {
+ return;
+ }
+ OpAddEntry opAddEntry = pendingAddEntries.peek();
+ if (opAddEntry != null) {
+ boolean isTimedOut = opAddEntry.lastInitTime != -1
+ && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
+ && opAddEntry.completed == FALSE;
+ if (isTimedOut) {
+ log.error("Failed to add entry for ledger {} in time-out {} sec",
+ (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
+ opAddEntry.handleAddFailure(opAddEntry.ledger);
+ }
+ }
+ }
+
+ private void checkReadTimeout() {
+ long timeoutSec = config.getReadEntryTimeoutSeconds();
+ if (timeoutSec < 1) {
+ return;
+ }
+ ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
+ if (callback != null && callback.isTimedOut(timeoutSec)) {
+ log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
+ timeoutSec);
+ callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
+ LAST_READ_CALLBACK.set(this, null);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index b02b0d8..dbe5d25 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -34,8 +33,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
import org.apache.bookkeeper.mledger.Position;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
class OpReadEntry implements ReadEntriesCallback {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 497ae9e..d0abbbb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2277,8 +2277,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
BookKeeper bk = mock(BookKeeper.class);
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
- CountDownLatch latch1 = new CountDownLatch(1);
-
+ String ctxStr = "timeoutCtx";
CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
ReadHandle ledgerHandle = mock(ReadHandle.class);
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
@@ -2289,27 +2288,27 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
responseException1.set(null);
- latch1.countDown();
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ assertEquals(ctxStr, (String) ctx);
responseException1.set(exception);
- latch1.countDown();
}
- }, null);
+ }, ctxStr);
ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
}
}, Collections.emptyMap());
- latch1.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+ retryStrategically((test) -> {
+ return responseException1.get() != null;
+ }, 5, 1000);
assertNotNull(responseException1.get());
assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
- CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
PositionImpl readPositionRef = PositionImpl.earliest;
ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
@@ -2317,19 +2316,20 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
- latch2.countDown();
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ assertEquals(ctxStr, (String) ctx);
responseException2.set(exception);
- latch2.countDown();
}
}, null);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
- false, opReadEntry, null);
- latch2.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+ false, opReadEntry, ctxStr);
+ retryStrategically((test) -> {
+ return responseException2.get() != null;
+ }, 5, 1000);
assertNotNull(responseException2.get());
assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
@@ -2405,4 +2405,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
field.setAccessible(true);
field.set(classObj, fieldValue);
}
+
+ public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+ throws Exception {
+ for (int i = 0; i < retryCount; i++) {
+ if (predicate.test(null) || i == (retryCount - 1)) {
+ break;
+ }
+ Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+ }
+ }
}