You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/09/17 04:55:26 UTC
[incubator-pulsar] branch master updated: Add ledger op timeout to
avoid topics stuck on ledger-creation (#2535)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5e88c1 Add ledger op timeout to avoid topics stuck on ledger-creation (#2535)
d5e88c1 is described below
commit d5e88c1ec16df557655e42c9f648a2fd3343d759
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Sep 16 21:55:21 2018 -0700
Add ledger op timeout to avoid topics stuck on ledger-creation (#2535)
* Add ledger op timeout to avoid topics stuck on ledger-creation
* rename to metadataOperationsTimeoutSeconds
* ad service config for managedLedgerMetadataOperationsTimeoutSeconds
---
conf/broker.conf | 3 +
conf/standalone.conf | 3 +
.../bookkeeper/mledger/ManagedLedgerConfig.java | 21 +++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 101 +++++++-------
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 ++++++++++++++-------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 47 ++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 10 ++
.../pulsar/broker/service/BrokerService.java | 2 +
8 files changed, 234 insertions(+), 103 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8927a85..2277cc8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
### --- Load balancer --- ###
# Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a68664c..cc8f564 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
### --- Load balancer --- ###
loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 698d245..5967453 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -56,6 +56,7 @@ public class ManagedLedgerConfig {
private boolean autoSkipNonRecoverableData;
private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
private long offloadAutoTriggerSizeThresholdBytes = -1;
+ private long metadataOperationsTimeoutSeconds = 60;
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -511,4 +512,24 @@ public class ManagedLedgerConfig {
this.clock = clock;
return this;
}
+
+ /**
+ *
+ * Ledger-Op (Create/Delete) timeout
+ *
+ * @return
+ */
+ public long getMetadataOperationsTimeoutSeconds() {
+ return metadataOperationsTimeoutSeconds;
+ }
+
+ /**
+ * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
+ *
+ * @param metadataOperationsTimeoutSeconds
+ */
+ public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) {
+ this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
+ return this;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 50b4722..0ac818f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1974,68 +1974,73 @@ public class ManagedCursorImpl implements ManagedCursor {
void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
- bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
- config.getMetadataAckQuorumSize(), digestType, config.getPassword(), (rc, lh, ctx) -> {
- ledger.getExecutor().execute(safeRun(() -> {
- ledger.mbean.endCursorLedgerCreateOp();
- if (rc != BKException.Code.OK) {
- log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
- BKException.getMessage(rc));
- callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
- return;
- }
+ ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
+
+ if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+ return;
+ }
+ ledger.getExecutor().execute(safeRun(() -> {
+ ledger.mbean.endCursorLedgerCreateOp();
+ if (rc != BKException.Code.OK) {
+ log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
+ BKException.getMessage(rc));
+ callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
+ }
+ // Created the ledger, now write the last position
+ // content
+ MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+ persistPositionToLedger(lh, mdEntry, new VoidCallback() {
+ @Override
+ public void operationComplete() {
if (log.isDebugEnabled()) {
- log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
+ log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+ mdEntry.newPosition, name);
}
- // Created the ledger, now write the last position
- // content
- MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
- persistPositionToLedger(lh, mdEntry, new VoidCallback() {
+ switchToNewLedger(lh, new VoidCallback() {
@Override
public void operationComplete() {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
- mdEntry.newPosition, name);
- }
- switchToNewLedger(lh, new VoidCallback() {
- @Override
- public void operationComplete() {
- callback.operationComplete();
- }
-
- @Override
- public void operationFailed(ManagedLedgerException exception) {
- // it means it failed to switch the newly created ledger so, it should be
- // deleted to prevent leak
- bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
- if (rc != BKException.Code.OK) {
- log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
- lh.getId());
- }
- }, null);
- callback.operationFailed(exception);
- }
- });
+ callback.operationComplete();
}
@Override
public void operationFailed(ManagedLedgerException exception) {
- log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
- mdEntry.newPosition, name);
-
- ledger.mbean.startCursorLedgerDeleteOp();
- bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
- @Override
- public void deleteComplete(int rc, Object ctx) {
- ledger.mbean.endCursorLedgerDeleteOp();
+ // it means it failed to switch the newly created ledger so, it should be
+ // deleted to prevent leak
+ bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
+ if (rc != BKException.Code.OK) {
+ log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
+ lh.getId());
}
}, null);
callback.operationFailed(exception);
}
});
- }));
- }, null, Collections.emptyMap());
+ }
+
+ @Override
+ public void operationFailed(ManagedLedgerException exception) {
+ log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+ mdEntry.newPosition, name);
+
+ ledger.mbean.startCursorLedgerDeleteOp();
+ bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
+ @Override
+ public void deleteComplete(int rc, Object ctx) {
+ ledger.mbean.endCursorLedgerDeleteOp();
+ }
+ }, null);
+ callback.operationFailed(exception);
+ }
+ });
+ }));
+ }, Collections.emptyMap());
+
}
private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f332e17..d33f151 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,17 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
@@ -51,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +52,7 @@ import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.Backoff;
@@ -111,6 +102,17 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
@@ -185,7 +187,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
-
+
enum State {
None, // Uninitialized
LedgerOpened, // A ledger is ready to write into
@@ -364,39 +366,44 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Create a new ledger to start writing
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
- digestType, config.getPassword(), (rc, lh, ctx) -> {
- executor.executeOrdered(name, safeRun(() -> {
- mbean.endDataLedgerCreateOp();
- if (rc != BKException.Code.OK) {
- callback.initializeFailed(createManagedLedgerException(rc));
- return;
- }
+
+ asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
- log.info("[{}] Created ledger {}", name, lh.getId());
- STATE_UPDATER.set(this, State.LedgerOpened);
- lastLedgerCreatedTimestamp = clock.millis();
- currentLedger = lh;
-
- lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
- // bypass empty ledgers, find last ledger with Message if possible.
- while (lastConfirmedEntry.getEntryId() == -1) {
- Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
- if (formerLedger != null) {
- LedgerInfo ledgerInfo = formerLedger.getValue();
- lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
- } else {
- break;
- }
- }
+ if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+ return;
+ }
+
+ executor.executeOrdered(name, safeRun(() -> {
+ mbean.endDataLedgerCreateOp();
+ if (rc != BKException.Code.OK) {
+ callback.initializeFailed(createManagedLedgerException(rc));
+ return;
+ }
- LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
- ledgers.put(lh.getId(), info);
+ log.info("[{}] Created ledger {}", name, lh.getId());
+ STATE_UPDATER.set(this, State.LedgerOpened);
+ lastLedgerCreatedTimestamp = clock.millis();
+ currentLedger = lh;
+
+ lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+ // bypass empty ledgers, find last ledger with Message if possible.
+ while (lastConfirmedEntry.getEntryId() == -1) {
+ Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+ if (formerLedger != null) {
+ LedgerInfo ledgerInfo = formerLedger.getValue();
+ lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+ } else {
+ break;
+ }
+ }
+
+ LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
+ ledgers.put(lh.getId(), info);
- // Save it back to ensure all nodes exist
- store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
- }));
- }, null, Collections.emptyMap());
+ // Save it back to ensure all nodes exist
+ store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
+ }));
+ }, Collections.emptyMap());
}
private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
@@ -564,9 +571,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
- config.getAckQuorumSize(), digestType, config.getPassword(), this, null,
- Collections.emptyMap());
+ asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
} else {
checkArgument(state == State.LedgerOpened, "ledger=%s is not opened", state);
@@ -1155,6 +1160,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}
+
+ if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+ return;
+ }
+
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
@@ -1320,9 +1330,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
- config.getAckQuorumSize(), digestType, config.getPassword(), this, null,
- Collections.emptyMap());
+ asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
}
@@ -2796,6 +2804,52 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
+ /**
+ * Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback
+ * with TimeoutException.
+ *
+ * @param bookKeeper
+ * @param config
+ * @param digestType
+ * @param cb
+ * @param emptyMap
+ */
+ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
+ CreateCallback cb, Map<Object, Object> emptyMap) {
+ AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+ bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
+ digestType, config.getPassword(), cb, ledgerCreated, Collections.emptyMap());
+ scheduledExecutor.schedule(() -> {
+ if (!ledgerCreated.get()) {
+ ledgerCreated.set(true);
+ cb.createComplete(BKException.Code.TimeoutException, null, null);
+ }
+ }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+ }
+
+ /**
+ * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
+ *
+ * @param rc
+ * @param lh
+ * @param ctx
+ * @return
+ */
+ protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
+ if (ctx != null && ctx instanceof AtomicBoolean) {
+ // ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
+ if (((AtomicBoolean) (ctx)).get()) {
+ if (rc == BKException.Code.OK) {
+ log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
+ asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
+ }
+ return true;
+ }
+ ((AtomicBoolean) ctx).set(true);
+ }
+ return false;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d7cce97..aa85d80 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -25,18 +29,12 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -47,10 +45,15 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -87,6 +90,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
public class ManagedLedgerTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
@@ -2215,4 +2225,27 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
}
+
+ @Test
+ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
+
+ BookKeeper bk = mock(BookKeeper.class);
+ doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
+ AtomicInteger response = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+ ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
+ @Override
+ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+ response.set(rc);
+ latch.countDown();
+ }
+ }, Collections.emptyMap());
+
+ latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
+ assertEquals(response.get(), BKException.Code.TimeoutException);
+
+ ledger.close();
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 65696f7..84cc569 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -373,6 +373,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
// corrupted at bookkeeper and managed-cursor is stuck at that ledger.
@FieldContext(dynamic = true)
private boolean autoSkipNonRecoverableData = false;
+ // operation timeout while updating managed-ledger metadata.
+ private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
/*** --- Load balancer --- ****/
// Enable load balancer
@@ -1314,6 +1316,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.autoSkipNonRecoverableData = skipNonRecoverableLedger;
}
+ public long getManagedLedgerMetadataOperationsTimeoutSeconds() {
+ return managedLedgerMetadataOperationsTimeoutSeconds;
+ }
+
+ public void setManagedLedgerMetadataOperationsTimeoutSeconds(long managedLedgerMetadataOperationsTimeoutSeconds) {
+ this.managedLedgerMetadataOperationsTimeoutSeconds = managedLedgerMetadataOperationsTimeoutSeconds;
+ }
+
public boolean isLoadBalancerEnabled() {
return loadBalancerEnabled;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8620e82..406d3e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -755,6 +755,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
TimeUnit.MINUTES);
managedLedgerConfig.setMaxSizePerLedgerMb(2048);
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+ serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());