You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/10/05 03:43:09 UTC

[pulsar] branch master updated: Improve error handling logic for effectively once (#5271)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e95f43  Improve error handling logic for effectively once (#5271)
8e95f43 is described below

commit 8e95f438acce495688f6e99f1e3034da572eab07
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Oct 4 20:43:02 2019 -0700

    Improve error handling logic for effectively once (#5271)
    
    * Bug in Message Deduplication that may cause incorrect behavior
    
    * add tests
    
    * fix error message
    
    * fix client backoff
    
    * fix tests
    
    * cleaning up
    
    * Fix handling of BK write failures for message dedup
    
    * tests and clean up
    
    * refactoring code
    
    * fixing bugs
    
    * addressing comments
    
    * add missing license header
---
 .../service/persistent/MessageDeduplication.java   |  34 ++-
 .../broker/service/persistent/PersistentTopic.java | 125 ++++++---
 .../service/persistent/MessageDuplicationTest.java | 180 ++++++++++++-
 .../client/api/ClientDeduplicationFailureTest.java | 278 +++++++++++++++++++++
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |  37 ++-
 5 files changed, 602 insertions(+), 52 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 1a685b2..e898b4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -18,17 +18,8 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
@@ -41,14 +32,22 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 /**
  * Class that contains all the logic to control and perform the deduplication on the broker side
@@ -347,6 +346,17 @@ public class MessageDeduplication {
         }
     }
 
+    public void resetHighestSequenceIdPushed() {
+        if (!isEnabled()) {
+            return;
+        }
+
+        highestSequencedPushed.clear();
+        for (String producer : highestSequencedPersisted.keys()) {
+            highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
+        }
+    }
+
     private void takeSnapshot(PositionImpl position) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12792e8..dbc5bf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,22 +19,12 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -117,6 +107,21 @@ import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiFunction;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -145,7 +150,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
-    private final MessageDeduplication messageDeduplication;
+    protected final MessageDeduplication messageDeduplication;
 
     private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
     private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
@@ -163,6 +168,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     };
 
+    private final AtomicLong pendingWriteOps = new AtomicLong(0);
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -238,6 +245,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         checkReplicatedSubscriptionControllerState();
     }
 
+    // for testing purposes
+    @VisibleForTesting
+    PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) {
+        super(topic, brokerService);
+        this.ledger = ledger;
+        this.messageDeduplication = messageDeduplication;
+        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
+    }
+
     private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
         synchronized (dispatchRateLimiter) {
             // dispatch rate limiter for topic
@@ -272,17 +290,41 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     @Override
     public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
+        pendingWriteOps.incrementAndGet();
+        if (isFenced) {
+            publishContext.completed(new TopicFencedException("fenced"), -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
+
         MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
-        switch (status){
+        switch (status) {
             case NotDup:
                 ledger.asyncAddEntry(headersAndPayload, this, publishContext);
                 break;
             case Dup:
                 // Immediately acknowledge duplicated message
                 publishContext.completed(null, -1, -1);
+                decrementPendingWriteOpsAndCheck();
                 break;
             default:
                 publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
+                decrementPendingWriteOpsAndCheck();
+
+        }
+    }
+
+    private void decrementPendingWriteOpsAndCheck() {
+        long pending = pendingWriteOps.decrementAndGet();
+        if (pending == 0 && isFenced) {
+            synchronized (this) {
+                if (isFenced) {
+                    messageDeduplication.resetHighestSequenceIdPushed();
+                    log.info("[{}] Un-fencing topic...", topic);
+                    isFenced = false;
+                }
+
+            }
         }
     }
 
@@ -294,35 +336,50 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         // Message has been successfully persisted
         messageDeduplication.recordMessagePersisted(publishContext, position);
         publishContext.completed(null, position.getLedgerId(), position.getEntryId());
+
+        decrementPendingWriteOpsAndCheck();
     }
 
     @Override
-    public void addFailed(ManagedLedgerException exception, Object ctx) {
-        PublishContext callback = (PublishContext) ctx;
-
-        if (exception instanceof ManagedLedgerAlreadyClosedException) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
-            }
-
-            callback.completed(new TopicClosedException(exception), -1, -1);
-            return;
-
-        } else {
-            log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
-        }
+    public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
 
-        if (exception instanceof ManagedLedgerTerminatedException) {
-            // Signal the producer that this topic is no longer available
-            callback.completed(new TopicTerminatedException(exception), -1, -1);
-        } else {
-            // Use generic persistence exception
-            callback.completed(new PersistenceException(exception), -1, -1);
-        }
+        // fence topic when failed to write a message to BK
+        isFenced = true;
 
         if (exception instanceof ManagedLedgerFencedException) {
             // If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
             close();
+        } else {
+
+            // close all producers
+            List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            producers.forEach(producer -> futures.add(producer.disconnect()));
+            FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
+                decrementPendingWriteOpsAndCheck();
+                return null;
+            });
+
+            PublishContext callback = (PublishContext) ctx;
+
+            if (exception instanceof ManagedLedgerAlreadyClosedException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+                }
+
+                callback.completed(new TopicClosedException(exception), -1, -1);
+                return;
+
+            } else {
+                log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+            }
+
+            if (exception instanceof ManagedLedgerTerminatedException) {
+                // Signal the producer that this topic is no longer available
+                callback.completed(new TopicTerminatedException(exception), -1, -1);
+            } else {
+                // Use generic persistence exception
+                callback.completed(new PersistenceException(exception), -1, -1);
+            }
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index aa6e9d4..a29de11 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -21,17 +21,29 @@ package org.apache.pulsar.broker.service.persistent;
 import io.netty.buffer.ByteBuf;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.Commands;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -115,6 +127,170 @@ public class MessageDuplicationTest {
         assertEquals(lastSequenceIdPushed.longValue(), 5);
     }
 
+    @Test
+    public void testIsDuplicateWithFailure() {
+
+        PulsarService pulsarService = mock(PulsarService.class);
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+
+        doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+
+        ManagedLedger managedLedger = mock(ManagedLedger.class);
+        MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
+        doReturn(true).when(messageDeduplication).isEnabled();
+
+
+        ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Object[] args = invocationOnMock.getArguments();
+                Runnable test = (Runnable) args[0];
+                test.run();
+                return null;
+            }
+        }).when(scheduledExecutorService).submit(any(Runnable.class));
+
+        BrokerService brokerService = mock(BrokerService.class);
+        doReturn(scheduledExecutorService).when(brokerService).executor();
+        doReturn(pulsarService).when(brokerService).pulsar();
+
+        PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication));
+
+        String producerName1 = "producer1";
+        ByteBuf byteBuf1 = getMessage(producerName1, 0);
+        Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0);
+
+        String producerName2 = "producer2";
+        ByteBuf byteBuf2 = getMessage(producerName2, 1);
+        Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1);
+
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        persistentTopic.addComplete(new PositionImpl(0, 1), publishContext1);
+        verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 0);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 0);
+
+        persistentTopic.publishMessage(byteBuf2, publishContext2);
+        persistentTopic.addComplete(new PositionImpl(0, 2), publishContext2);
+        verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+
+        byteBuf1 = getMessage(producerName1, 1);
+        publishContext1 = getPublishContext(producerName1, 1);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        persistentTopic.addComplete(new PositionImpl(0, 3), publishContext1);
+        verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+
+        byteBuf1 = getMessage(producerName1, 5);
+        publishContext1 = getPublishContext(producerName1, 5);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        persistentTopic.addComplete(new PositionImpl(0, 4), publishContext1);
+        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 5);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 5);
+
+        // publish dup
+        byteBuf1 = getMessage(producerName1, 0);
+        publishContext1 = getPublishContext(producerName1, 0);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 5);
+        verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
+
+        // publish message unknown dup status
+        byteBuf1 = getMessage(producerName1, 6);
+        publishContext1 = getPublishContext(producerName1, 6);
+        // don't complete message
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 6);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 5);
+
+        // publish same message again
+        byteBuf1 = getMessage(producerName1, 6);
+        publishContext1 = getPublishContext(producerName1, 6);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L));
+
+        // complete seq 6 message eventually
+        persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1);
+
+        // simulate failure
+        byteBuf1 = getMessage(producerName1, 7);
+        publishContext1 = getPublishContext(producerName1, 7);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
+
+        persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext1);
+        // check highestSequencedPushed is reset
+        assertEquals(messageDeduplication.highestSequencedPushed.size(), 2);
+        assertEquals(messageDeduplication.highestSequencedPersisted.size(), 2);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertEquals(lastSequenceIdPushed.longValue(), 6);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertEquals(lastSequenceIdPushed.longValue(), 6);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
+        assertEquals(lastSequenceIdPushed.longValue(), 1);
+        verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed();
+
+        // try dup
+        byteBuf1 = getMessage(producerName1, 6);
+        publishContext1 = getPublishContext(producerName1, 6);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 6);
+
+        // try new message
+        byteBuf1 = getMessage(producerName1, 8);
+        publishContext1 = getPublishContext(producerName1, 8);
+        persistentTopic.publishMessage(byteBuf1, publishContext1);
+        verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any());
+        persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 8);
+        lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+        assertTrue(lastSequenceIdPushed != null);
+        assertEquals(lastSequenceIdPushed.longValue(), 8);
+
+    }
+
     public ByteBuf getMessage(String producerName, long seqId) {
         PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
                 .setProducerName(producerName).setSequenceId(seqId)
@@ -127,7 +303,7 @@ public class MessageDuplicationTest {
     }
 
     public Topic.PublishContext getPublishContext(String producerName, long seqId) {
-        return new Topic.PublishContext() {
+        return spy(new Topic.PublishContext() {
             @Override
             public String getProducerName() {
                 return producerName;
@@ -141,6 +317,6 @@ public class MessageDuplicationTest {
             public void completed(Exception e, long ledgerId, long entryId) {
 
             }
-        };
+        });
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
new file mode 100644
index 0000000..55020c6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.io.PulsarFunctionE2ETest;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class ClientDeduplicationFailureTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    URL url;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    final String tenant = "external-repl-prop";
+    String primaryHost;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+
+    @BeforeMethod(timeOut = 300000)
+    void setup(Method method) throws Exception {
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager::nextFreePort);
+        bkEnsemble.start();
+
+        String brokerServiceUrl = "http://127.0.0.1:" + brokerWebServicePort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        config.setWebServicePort(Optional.ofNullable(brokerWebServicePort));
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(Optional.ofNullable(brokerServicePort));
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setTlsAllowInsecureConnection(true);
+        config.setAdvertisedAddress("localhost");
+        config.setLoadBalancerSheddingEnabled(false);
+
+        config.setAllowAutoTopicCreationType("non-partitioned");
+
+        url = new URL(brokerServiceUrl);
+        pulsar = new PulsarService(config, Optional.empty());
+        pulsar.start();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build();
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(url.toString());
+        admin.clusters().createCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()).maxBackoffInterval(1, TimeUnit.SECONDS);
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().createTenant(tenant, tenantInfo);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        admin.close();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    @Test
+    public void testClientDeduplicationWithBkFailure() throws  Exception {
+        final String namespacePortion = "dedup";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+        final String subscriptionName1 = "sub1";
+        final String subscriptionName2 = "sub2";
+        final String consumerName1 = "test-consumer-1";
+        final String consumerName2 = "test-consumer-2";
+        final List<Message<String>> msgRecvd = new LinkedList<>();
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+        admin.namespaces().setDeduplicationStatus(replNamespace, true);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic)
+                .producerName("test-producer-1").create();
+        Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
+                .consumerName(consumerName1).subscriptionName(subscriptionName1).subscribe();
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
+                .consumerName(consumerName2).subscriptionName(subscriptionName2).subscribe();
+
+        new Thread(() -> {
+            while(true) {
+                try {
+                    Message<String> msg = consumer2.receive();
+                    msgRecvd.add(msg);
+                    consumer2.acknowledge(msg);
+                } catch (PulsarClientException e) {
+                    log.error("Failed to consume message: {}", e, e);
+                }
+            }
+        }).start();
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+                boolean c1 =  topicStats!= null
+                        && topicStats.subscriptions.get(subscriptionName1) != null
+                        && topicStats.subscriptions.get(subscriptionName1).consumers.size() == 1
+                        && topicStats.subscriptions.get(subscriptionName1).consumers.get(0).consumerName.equals(consumerName1);
+
+                boolean c2 =  topicStats!= null
+                        && topicStats.subscriptions.get(subscriptionName2) != null
+                        && topicStats.subscriptions.get(subscriptionName2).consumers.size() == 1
+                        && topicStats.subscriptions.get(subscriptionName2).consumers.get(0).consumerName.equals(consumerName2);
+                return c1 && c2;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 200);
+
+        TopicStats topicStats1 = admin.topics().getStats(sourceTopic);
+        assertTrue(topicStats1!= null);
+        assertTrue(topicStats1.subscriptions.get(subscriptionName1) != null);
+        assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.size(), 1);
+        assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.get(0).consumerName, consumerName1);
+        TopicStats topicStats2 = admin.topics().getStats(sourceTopic);
+        assertTrue(topicStats2!= null);
+        assertTrue(topicStats2.subscriptions.get(subscriptionName2) != null);
+        assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.size(), 1);
+        assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.get(0).consumerName, consumerName2);
+
+        for (int i=0; i<10; i++) {
+            producer.newMessage().sequenceId(i).value("foo-" + i).send();
+        }
+
+        for (int i=0; i<10; i++) {
+            Message<String> msg = consumer1.receive();
+            consumer1.acknowledge(msg);
+            assertEquals(msg.getValue(), "foo-" + i);
+            assertEquals(msg.getSequenceId(), i);
+        }
+
+        log.info("Stopping BK...");
+        bkEnsemble.stopBK();
+
+        List<CompletableFuture<MessageId>> futures = new LinkedList<>();
+        for (int i=10; i<20; i++) {
+            CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync();
+            int finalI = i;
+            future.thenRun(() -> log.error("message: {} successful", finalI)).exceptionally((Function<Throwable, Void>) throwable -> {
+                log.info("message: {} failed: {}", finalI, throwable, throwable);
+                return null;
+            });
+            futures.add(future);
+        }
+
+        for (int i = 0; i < futures.size(); i++) {
+            try {
+                // message should not be produced successfully
+                futures.get(i).join();
+                fail();
+            } catch (CompletionException ex) {
+
+            } catch (Exception e) {
+                fail();
+            }
+        }
+
+        try {
+            producer.newMessage().sequenceId(10).value("foo-10").send();
+            fail();
+        } catch (PulsarClientException ex) {
+
+        }
+
+        try {
+            producer.newMessage().sequenceId(10).value("foo-10").send();
+            fail();
+        } catch (PulsarClientException ex) {
+
+        }
+
+        log.info("Starting BK...");
+        bkEnsemble.startBK();
+
+        for (int i=20; i<30; i++) {
+            producer.newMessage().sequenceId(i).value("foo-" + i).send();
+        }
+
+        MessageId lastMessageId = null;
+        for (int i=20; i<30; i++) {
+            Message<String> msg = consumer1.receive();
+            lastMessageId = msg.getMessageId();
+            consumer1.acknowledge(msg);
+            assertEquals(msg.getValue(), "foo-" + i);
+            assertEquals(msg.getSequenceId(), i);
+        }
+
+        // check all messages
+        retryStrategically((test) -> msgRecvd.size() >= 20, 5, 200);
+
+        assertEquals(msgRecvd.size(), 20);
+        for (int i=0; i<10; i++) {
+            assertEquals(msgRecvd.get(i).getValue(), "foo-" + i);
+            assertEquals(msgRecvd.get(i).getSequenceId(), i);
+        }
+        for (int i=10; i<20; i++) {
+            assertEquals(msgRecvd.get(i).getValue(), "foo-" + (i + 10));
+            assertEquals(msgRecvd.get(i).getSequenceId(), i + 10);
+        }
+
+        BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+        MessageIdImpl messageId = (MessageIdImpl) consumer1.getLastMessageId();
+
+        assertEquals(messageId.getLedgerId(), batchMessageId.getLedgerId());
+        assertEquals(messageId.getEntryId(), batchMessageId.getEntryId());
+    }
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 373f8bc..4c4d92d 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -53,6 +54,7 @@ import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
@@ -162,10 +164,6 @@ public class LocalBookkeeperEnsemble {
     StreamStorageLifecycleComponent streamStorage;
     Integer streamStoragePort = 4181;
 
-    /**
-     * @param args
-     */
-
     private void runZookeeper(int maxCC) throws IOException {
         // create a ZooKeeper server(dataDir, dataLogDir, port)
         LOG.info("Starting ZK server");
@@ -399,6 +397,37 @@ public class LocalBookkeeperEnsemble {
         }
     }
 
+    public void stopBK() {
+        LOG.debug("Local ZK/BK stopping ...");
+        for (BookieServer bookie : bs) {
+            bookie.shutdown();
+        }
+    }
+
+    public void startBK() throws Exception {
+        for (int i = 0; i < numberOfBookies; i++) {
+
+            try {
+                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
+            } catch (InvalidCookieException e) {
+                // InvalidCookieException can happen if the machine IP has changed
+                // Since we are running here a local bookie that is always accessed
+                // from localhost, we can ignore the error
+                for (String path : zkc.getChildren("/ledgers/cookies", false)) {
+                    zkc.delete("/ledgers/cookies/" + path, -1);
+                }
+
+                // Also clean the on-disk cookie
+                new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
+
+                // Retry to start the bookie after cleaning the old left cookie
+                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
+
+            }
+            bs[i].start();
+        }
+    }
+
     public void stop() throws Exception {
         if (null != streamStorage) {
             LOG.debug("Local bk stream storage stopping ...");