You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/15 15:16:51 UTC
[pulsar] Diff for: [GitHub] sijie merged pull request #3347: Support
Add-entry timeout at broker to avoid stuck topics
diff --git a/conf/broker.conf b/conf/broker.conf
index 1cf3a8a371..3c2e5acc35 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -445,6 +445,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
### --- Load balancer --- ###
# Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 16d6465dfb..06be447f9c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -332,6 +332,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
### --- Load balancer --- ###
loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 255b534920..8f89050516 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -58,7 +58,7 @@
private long offloadAutoTriggerSizeThresholdBytes = -1;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
-
+ private long addEntryTimeoutSeconds = 120;
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
@@ -554,4 +554,18 @@ public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSecon
this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
return this;
}
+
+ public long getAddEntryTimeoutSeconds() {
+ return addEntryTimeoutSeconds;
+ }
+
+ /**
+ * Add-entry timeout after which add-entry callback will be failed if add-entry is not succeeded.
+ *
+ * @param addEntryTimeoutSeconds
+ */
+ public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds) {
+ this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
+ return this;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index dd54630da0..26b3acc69d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.time.Clock;
@@ -105,6 +106,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -166,6 +168,8 @@
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
final EntryCache entryCache;
+
+ private ScheduledFuture<?> timeoutTask;
/**
* This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
@@ -328,6 +332,28 @@ public void operationFailed(MetaStoreException e) {
}
}
});
+
+ scheduleTimeoutTask();
+ }
+
+ private void scheduleTimeoutTask() {
+ long timeoutSec = config.getAddEntryTimeoutSeconds();
+ // disable timeout task checker if timeout <= 0
+ if (timeoutSec > 0) {
+ this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> {
+ OpAddEntry opAddEntry = pendingAddEntries.peek();
+ if (opAddEntry != null) {
+ boolean isTimedOut = opAddEntry.lastInitTime != -1
+ && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
+ && opAddEntry.completed == FALSE;
+ if (isTimedOut) {
+ log.error("Failed to add entry for ledger {} in time-out {} sec",
+ (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
+ opAddEntry.handleAddFailure(opAddEntry.ledger);
+ }
+ }
+ }, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
+ }
}
private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
@@ -1146,6 +1172,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
closeAllCursors(callback, ctx);
}, null);
+
+ if (this.timeoutTask != null) {
+ this.timeoutTask.cancel(false);
+ }
}
private void closeAllCursors(CloseCallback callback, final Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 5d9d57ce7f..0cef300198 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -23,7 +23,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -35,6 +38,8 @@
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
/**
* Handles the life-cycle of an addEntry() operation.
@@ -42,7 +47,7 @@
*/
class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
private ManagedLedgerImpl ml;
- private LedgerHandle ledger;
+ LedgerHandle ledger;
private long entryId;
@SuppressWarnings("unused")
@@ -50,6 +55,11 @@
private Object ctx;
private boolean closeWhenDone;
private long startTime;
+ volatile long lastInitTime;
+ private static final AtomicIntegerFieldUpdater<OpAddEntry> COMPLETED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed");
+ @SuppressWarnings("unused")
+ volatile int completed = FALSE;
ByteBuf data;
private int dataLength;
@@ -67,6 +77,7 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
+ op.completed = FALSE;
ml.mbean.addAddEntrySample(op.dataLength);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -86,6 +97,7 @@ public void initiate() {
ByteBuf duplicateBuffer = data.retainedDuplicate();
// duplicatedBuffer has refCnt=1 at this point
+ lastInitTime = System.nanoTime();
ledger.asyncAddEntry(duplicateBuffer, this, ctx);
// Internally, asyncAddEntry() is refCnt neutral to respect to the passed buffer and it will keep a ref on it
@@ -95,6 +107,9 @@ public void initiate() {
}
public void failed(ManagedLedgerException e) {
+ if (!checkAndCompleteTimeoutTask()) {
+ return;
+ }
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
data.release();
if (cb != null) {
@@ -119,17 +134,11 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
}
if (rc != BKException.Code.OK) {
- // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the
- // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will
- // be marked as failed.
- ml.mbean.recordAddEntryError();
-
- ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
- // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
- // from a BK callback.
- ml.ledgerClosed(lh);
- }));
+ handleAddFailure(lh);
} else {
+ if(!checkAndCompleteTimeoutTask()) {
+ return;
+ }
// Trigger addComplete callback in a thread hashed on the managed ledger name
ml.getExecutor().executeOrdered(ml.getName(), this);
}
@@ -200,6 +209,40 @@ private void updateLatency() {
ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * It cancels timeout task and checks if add-entry operation is not completed yet.
+ *
+ * @return true if task is not already completed else returns false.
+ */
+ private boolean checkAndCompleteTimeoutTask() {
+ if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Add-entry already completed for {}-{}", this.ledger != null ? this.ledger.getId() : -1,
+ this.entryId);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
+ *
+ * @param ledger
+ */
+ void handleAddFailure(final LedgerHandle ledger) {
+ // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the
+ // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will
+ // be marked as failed.
+ ml.mbean.recordAddEntryError();
+
+ ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
+ // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
+ // from a BK callback.
+ ml.ledgerClosed(ledger);
+ }));
+ }
+
private final Handle<OpAddEntry> recyclerHandle;
private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
@@ -220,8 +263,10 @@ public void recycle() {
callback = null;
ctx = null;
closeWhenDone = false;
+ completed = FALSE;
entryId = -1;
startTime = -1;
+ lastInitTime = -1;
recyclerHandle.recycle(this);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index cedd3b036b..2397a62a97 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -59,7 +59,7 @@
long lastEntry = -1;
boolean fenced = false;
- PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws GeneralSecurityException {
super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
EnumSet.noneOf(WriteFlag.class));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a16dcd2c4c..6ed13a59d6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -34,8 +35,10 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.charset.Charset;
+import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -46,10 +49,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -57,6 +62,13 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -2324,4 +2336,74 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}
+
+ /**
+ * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought
+ * to create new ledger and add entry successfully.
+ *
+ *
+ * @throws Exception
+ */
+ @Test(timeOut = 20000)
+ public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(1);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
+
+ BookKeeper bk = mock(BookKeeper.class);
+ doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
+
+ PulsarMockBookKeeper bkClient = mock(PulsarMockBookKeeper.class);
+ ClientConfiguration conf = new ClientConfiguration();
+ doReturn(conf).when(bkClient).getConf();
+ class MockLedgerHandle extends PulsarMockLedgerHandle {
+ public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
+ throws GeneralSecurityException {
+ super(bk, id, digest, passwd);
+ }
+
+ @Override
+ public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
+ // do nothing
+ }
+
+ @Override
+ public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) {
+ cb.closeComplete(BKException.Code.OK, this, ctx);
+ }
+ }
+ MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+ final String data = "data";
+ doNothing().when(ledgerHandle).asyncAddEntry(data.getBytes(), null, null);
+ AtomicBoolean addSuccess = new AtomicBoolean();
+
+ setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", ledgerHandle);
+
+ final int totalAddEntries = 1;
+ CountDownLatch latch = new CountDownLatch(totalAddEntries);
+ ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() {
+
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ addSuccess.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+
+ latch.await();
+
+ assertTrue(addSuccess.get());
+
+ setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
+ }
+
+ private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(classObj, fieldValue);
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 96259870dd..048aecd2dd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -757,14 +757,17 @@
doc = "operation timeout while updating managed-ledger metadata."
)
private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
+
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Read entries timeout when broker tries to read messages from bookkeeper "
- + "(disable timeout by setting readTimeoutSeconds <= 0)"
+ + "(0 to disable it)"
)
- private long managedLedgerReadEntryTimeoutSeconds = 60;
+ private long managedLedgerReadEntryTimeoutSeconds = 120;
-
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
+ private long managedLedgerAddEntryTimeoutSeconds = 120;
/*** --- Load balancer --- ****/
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b2b5103a9c..ff2a8dcce7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -745,6 +745,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index e0130d2fa0..0720d3e95c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -25,6 +25,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
@@ -290,5 +291,11 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
}
}
+ public static void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(classObj, fieldValue);
+ }
+
private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6988863eb4..feb5b9e4f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -23,9 +23,12 @@
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -33,6 +36,7 @@
import java.lang.reflect.Field;
import java.net.URI;
+import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
@@ -46,16 +50,24 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
@@ -757,4 +769,66 @@ public void testOperationTimeout() throws PulsarClientException {
}
}
+ @Test(timeOut = 20000)
+ public void testAddEntryOperationTimeout() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setManagedLedgerAddEntryTimeoutSeconds(1);
+
+ final String topicName = "persistent://my-property/my-ns/addEntryTimeoutTopic";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+
+ class MockLedgerHandle extends PulsarMockLedgerHandle {
+ public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
+ throws GeneralSecurityException {
+ super(bk, id, digest, passwd);
+ }
+
+ @Override
+ public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
+ // do nothing
+ }
+
+ @Override
+ public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) {
+ cb.closeComplete(BKException.Code.OK, this, ctx);
+ }
+ }
+ MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+ final byte[] data = "data".getBytes();
+ // this will make first entry to be timed out but then managed-ledger will create a new ledger and next time add
+ // entry should be successful.
+ doNothing().when(ledgerHandle).asyncAddEntry(data, null, null);
+
+ MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, "currentLedger", ledgerHandle);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean addedSuccessfully = new AtomicBoolean(false);
+ producer.sendAsync(data).handle((res, ex) -> {
+ if (ex == null) {
+ addedSuccessfully.set(true);
+ } else {
+ log.error("add-entry failed for {}", methodName, ex);
+ }
+ latch.countDown();
+ return null;
+ });
+ latch.await();
+
+ // broker should be resilient enough to add-entry timeout and add entry successfully.
+ assertTrue(addedSuccessfully.get());
+
+ byte[] receivedData = consumer.receive().getData();
+ assertEquals(receivedData, data);
+
+ producer.close();
+ consumer.close();
+ }
+
}
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index bbb3383ff9..616873bb47 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -245,6 +245,9 @@ configs:
- name: managedLedgerReadEntryTimeoutSeconds
default: '120'
description: Read entries timeout when broker tries to read messages from bookkeeper.
+- name: managedLedgerAddEntryTimeoutSeconds
+ default: '120'
+ description: Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
- name: loadBalancerEnabled
default: 'true'
description: Enable load balancer
With regards,
Apache Git Services