You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/15 15:16:51 UTC

[pulsar] Diff for: [GitHub] sijie merged pull request #3347: Support Add-entry timeout at broker to avoid stuck topics

diff --git a/conf/broker.conf b/conf/broker.conf
index 1cf3a8a371..3c2e5acc35 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -445,6 +445,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
 # Read entries timeout when broker tries to read messages from bookkeeper.
 managedLedgerReadEntryTimeoutSeconds=120
 
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 16d6465dfb..06be447f9c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -332,6 +332,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
 # Read entries timeout when broker tries to read messages from bookkeeper.
 managedLedgerReadEntryTimeoutSeconds=120
 
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 255b534920..8f89050516 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -58,7 +58,7 @@
     private long offloadAutoTriggerSizeThresholdBytes = -1;
     private long metadataOperationsTimeoutSeconds = 60;
     private long readEntryTimeoutSeconds = 120;
-
+    private long addEntryTimeoutSeconds = 120;
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
@@ -554,4 +554,18 @@ public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSecon
         this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
         return this;
     }
+    
+    public long getAddEntryTimeoutSeconds() {
+        return addEntryTimeoutSeconds;
+    }
+
+    /**
+     * Add-entry timeout after which add-entry callback will be failed if add-entry is not succeeded.
+     * 
+     * @param addEntryTimeoutSeconds
+     */
+    public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds) {
+        this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
+        return this;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index dd54630da0..26b3acc69d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.time.Clock;
@@ -105,6 +106,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -166,6 +168,8 @@
     final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
 
     final EntryCache entryCache;
+    
+    private ScheduledFuture<?> timeoutTask;
 
     /**
      * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
@@ -328,6 +332,28 @@ public void operationFailed(MetaStoreException e) {
                 }
             }
         });
+        
+        scheduleTimeoutTask();
+    }
+
+    private void scheduleTimeoutTask() {
+        long timeoutSec = config.getAddEntryTimeoutSeconds();
+        // disable timeout task checker if timeout <= 0
+        if (timeoutSec > 0) {
+            this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> {
+                OpAddEntry opAddEntry = pendingAddEntries.peek();
+                if (opAddEntry != null) {
+                    boolean isTimedOut = opAddEntry.lastInitTime != -1
+                            && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
+                            && opAddEntry.completed == FALSE;
+                    if (isTimedOut) {
+                        log.error("Failed to add entry for ledger {} in time-out {} sec",
+                                (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
+                        opAddEntry.handleAddFailure(opAddEntry.ledger);
+                    }
+                }
+            }, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
+        }
     }
 
     private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
@@ -1146,6 +1172,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
 
             closeAllCursors(callback, ctx);
         }, null);
+        
+        if (this.timeoutTask != null) {
+            this.timeoutTask.cancel(false);
+        }
     }
 
     private void closeAllCursors(CloseCallback callback, final Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 5d9d57ce7f..0cef300198 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -23,7 +23,10 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -35,6 +38,8 @@
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 /**
  * Handles the life-cycle of an addEntry() operation.
@@ -42,7 +47,7 @@
  */
 class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
     private ManagedLedgerImpl ml;
-    private LedgerHandle ledger;
+    LedgerHandle ledger;
     private long entryId;
 
     @SuppressWarnings("unused")
@@ -50,6 +55,11 @@
     private Object ctx;
     private boolean closeWhenDone;
     private long startTime;
+    volatile long lastInitTime;
+    private static final AtomicIntegerFieldUpdater<OpAddEntry> COMPLETED_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed");
+    @SuppressWarnings("unused")
+    volatile int completed = FALSE;
     ByteBuf data;
     private int dataLength;
 
@@ -67,6 +77,7 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall
         op.closeWhenDone = false;
         op.entryId = -1;
         op.startTime = System.nanoTime();
+        op.completed = FALSE;
         ml.mbean.addAddEntrySample(op.dataLength);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -86,6 +97,7 @@ public void initiate() {
         ByteBuf duplicateBuffer = data.retainedDuplicate();
         // duplicatedBuffer has refCnt=1 at this point
 
+        lastInitTime = System.nanoTime();
         ledger.asyncAddEntry(duplicateBuffer, this, ctx);
 
         // Internally, asyncAddEntry() is refCnt neutral to respect to the passed buffer and it will keep a ref on it
@@ -95,6 +107,9 @@ public void initiate() {
     }
 
     public void failed(ManagedLedgerException e) {
+        if (!checkAndCompleteTimeoutTask()) {
+            return;
+        }
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
         data.release();
         if (cb != null) {
@@ -119,17 +134,11 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
         }
 
         if (rc != BKException.Code.OK) {
-            // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the
-            // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will
-            // be marked as failed.
-            ml.mbean.recordAddEntryError();
-
-            ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
-                // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
-                // from a BK callback.
-                ml.ledgerClosed(lh);
-            }));
+            handleAddFailure(lh);
         } else {
+            if(!checkAndCompleteTimeoutTask()) {
+                return;
+            }
             // Trigger addComplete callback in a thread hashed on the managed ledger name
             ml.getExecutor().executeOrdered(ml.getName(), this);
         }
@@ -200,6 +209,40 @@ private void updateLatency() {
         ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
     }
 
+    /**
+     * It cancels timeout task and checks if add-entry operation is not completed yet.
+     * 
+     * @return true if task is not already completed else returns false.
+     */
+    private boolean checkAndCompleteTimeoutTask() {
+        if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Add-entry already completed for {}-{}", this.ledger != null ? this.ledger.getId() : -1,
+                        this.entryId);
+            }
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
+     * 
+     * @param ledger
+     */
+    void handleAddFailure(final LedgerHandle ledger) {
+        // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the
+        // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will
+        // be marked as failed.
+        ml.mbean.recordAddEntryError();
+
+        ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
+            // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
+            // from a BK callback.
+            ml.ledgerClosed(ledger);
+        }));
+    }
+    
     private final Handle<OpAddEntry> recyclerHandle;
 
     private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
@@ -220,8 +263,10 @@ public void recycle() {
         callback = null;
         ctx = null;
         closeWhenDone = false;
+        completed = FALSE;
         entryId = -1;
         startTime = -1;
+        lastInitTime = -1;
         recyclerHandle.recycle(this);
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index cedd3b036b..2397a62a97 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -59,7 +59,7 @@
     long lastEntry = -1;
     boolean fenced = false;
 
-    PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+    public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws GeneralSecurityException {
         super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
                 EnumSet.noneOf(WriteFlag.class));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a16dcd2c4c..6ed13a59d6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -34,8 +35,10 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.charset.Charset;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -46,10 +49,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -57,6 +62,13 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -2324,4 +2336,74 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
 
         ledger.close();
     }
+
+    /**
+     * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought
+     * to create new ledger and add entry successfully.
+     * 
+     * 
+     * @throws Exception
+     */
+    @Test(timeOut = 20000)
+    public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
+
+        BookKeeper bk = mock(BookKeeper.class);
+        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
+
+        PulsarMockBookKeeper bkClient = mock(PulsarMockBookKeeper.class);
+        ClientConfiguration conf = new ClientConfiguration();
+        doReturn(conf).when(bkClient).getConf();
+        class MockLedgerHandle extends PulsarMockLedgerHandle {
+            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
+                    throws GeneralSecurityException {
+                super(bk, id, digest, passwd);
+            }
+
+            @Override
+            public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
+                // do nothing
+            }
+
+            @Override
+            public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) {
+                cb.closeComplete(BKException.Code.OK, this, ctx);
+            }
+        }
+        MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+        final String data = "data";
+        doNothing().when(ledgerHandle).asyncAddEntry(data.getBytes(), null, null);
+        AtomicBoolean addSuccess = new AtomicBoolean();
+
+        setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", ledgerHandle);
+
+        final int totalAddEntries = 1;
+        CountDownLatch latch = new CountDownLatch(totalAddEntries);
+        ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() {
+
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                addSuccess.set(true);
+                latch.countDown();
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        latch.await();
+
+        assertTrue(addSuccess.get());
+
+        setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
+    }
+
+    private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(classObj, fieldValue);
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 96259870dd..048aecd2dd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -757,14 +757,17 @@
         doc = "operation timeout while updating managed-ledger metadata."
     )
     private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
+
     @FieldContext(
             category = CATEGORY_STORAGE_ML,
             doc = "Read entries timeout when broker tries to read messages from bookkeeper "
-                    + "(disable timeout by setting readTimeoutSeconds <= 0)"
+                    + "(0 to disable it)"
         )
-    private long managedLedgerReadEntryTimeoutSeconds = 60;
+    private long managedLedgerReadEntryTimeoutSeconds = 120;
         
-    
+    @FieldContext(category = CATEGORY_STORAGE_ML, 
+            doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
+    private long managedLedgerAddEntryTimeoutSeconds = 120;
 
     /*** --- Load balancer --- ****/
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b2b5103a9c..ff2a8dcce7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -745,6 +745,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
             managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
                     serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
             managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+            managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
             managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
             managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
             managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index e0130d2fa0..0720d3e95c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -25,6 +25,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
@@ -290,5 +291,11 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
         }
     }
 
+    public static void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(classObj, fieldValue);
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6988863eb4..feb5b9e4f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -23,9 +23,12 @@
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -33,6 +36,7 @@
 
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -46,16 +50,24 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -757,4 +769,66 @@ public void testOperationTimeout() throws PulsarClientException {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testAddEntryOperationTimeout() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setManagedLedgerAddEntryTimeoutSeconds(1);
+
+        final String topicName = "persistent://my-property/my-ns/addEntryTimeoutTopic";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+
+        class MockLedgerHandle extends PulsarMockLedgerHandle {
+            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
+                    throws GeneralSecurityException {
+                super(bk, id, digest, passwd);
+            }
+
+            @Override
+            public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
+                // do nothing
+            }
+
+            @Override
+            public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) {
+                cb.closeComplete(BKException.Code.OK, this, ctx);
+            }
+        }
+        MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+        final byte[] data = "data".getBytes();
+        // this will make first entry to be timed out but then managed-ledger will create a new ledger and next time add
+        // entry should be successful.
+        doNothing().when(ledgerHandle).asyncAddEntry(data, null, null);
+
+        MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, "currentLedger", ledgerHandle);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean addedSuccessfully = new AtomicBoolean(false);
+        producer.sendAsync(data).handle((res, ex) -> {
+            if (ex == null) {
+                addedSuccessfully.set(true);
+            } else {
+                log.error("add-entry failed for {}", methodName, ex);
+            }
+            latch.countDown();
+            return null;
+        });
+        latch.await();
+
+        // broker should be resilient enough to add-entry timeout and add entry successfully.
+        assertTrue(addedSuccessfully.get());
+
+        byte[] receivedData = consumer.receive().getData();
+        assertEquals(receivedData, data);
+
+        producer.close();
+        consumer.close();
+    }
+    
 }
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index bbb3383ff9..616873bb47 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -245,6 +245,9 @@ configs:
 - name: managedLedgerReadEntryTimeoutSeconds
   default: '120'
   description: Read entries timeout when broker tries to read messages from bookkeeper.
+- name: managedLedgerAddEntryTimeoutSeconds
+  default: '120'
+  description: Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
 - name: loadBalancerEnabled
   default: 'true'
   description: Enable load balancer


With regards,
Apache Git Services