You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/10/05 03:43:09 UTC
[pulsar] branch master updated: Improve error handling logic for
effectively once (#5271)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e95f43 Improve error handling logic for effectively once (#5271)
8e95f43 is described below
commit 8e95f438acce495688f6e99f1e3034da572eab07
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Oct 4 20:43:02 2019 -0700
Improve error handling logic for effectively once (#5271)
* Bug in Message Deduplication that may cause incorrect behavior
* add tests
* fix error message
* fix client backoff
* fix tests
* cleaning up
* Fix handling of BK write failures for message dedup
* tests and clean up
* refactoring code
* fixing bugs
* addressing comments
* add missing license header
---
.../service/persistent/MessageDeduplication.java | 34 ++-
.../broker/service/persistent/PersistentTopic.java | 125 ++++++---
.../service/persistent/MessageDuplicationTest.java | 180 ++++++++++++-
.../client/api/ClientDeduplicationFailureTest.java | 278 +++++++++++++++++++++
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 37 ++-
5 files changed, 602 insertions(+), 52 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 1a685b2..e898b4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -18,17 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
@@ -41,14 +32,22 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic.PublishContext;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
/**
* Class that contains all the logic to control and perform the deduplication on the broker side
@@ -347,6 +346,17 @@ public class MessageDeduplication {
}
}
+ public void resetHighestSequenceIdPushed() {
+ if (!isEnabled()) {
+ return;
+ }
+
+ highestSequencedPushed.clear();
+ for (String producer : highestSequencedPersisted.keys()) {
+ highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
+ }
+ }
+
private void takeSnapshot(PositionImpl position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12792e8..dbc5bf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,22 +19,12 @@
package org.apache.pulsar.broker.service.persistent;
import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -117,6 +107,21 @@ import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiFunction;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -145,7 +150,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
- private final MessageDeduplication messageDeduplication;
+ protected final MessageDeduplication messageDeduplication;
private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
@@ -163,6 +168,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
};
+ private final AtomicLong pendingWriteOps = new AtomicLong(0);
+
private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
@@ -238,6 +245,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
checkReplicatedSubscriptionControllerState();
}
+ // for testing purposes
+ @VisibleForTesting
+ PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) {
+ super(topic, brokerService);
+ this.ledger = ledger;
+ this.messageDeduplication = messageDeduplication;
+ this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+ this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+ this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
+ }
+
private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
synchronized (dispatchRateLimiter) {
// dispatch rate limiter for topic
@@ -272,17 +290,41 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
+ pendingWriteOps.incrementAndGet();
+ if (isFenced) {
+ publishContext.completed(new TopicFencedException("fenced"), -1, -1);
+ decrementPendingWriteOpsAndCheck();
+ return;
+ }
+
MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
- switch (status){
+ switch (status) {
case NotDup:
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
+ decrementPendingWriteOpsAndCheck();
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
+ decrementPendingWriteOpsAndCheck();
+
+ }
+ }
+
+ private void decrementPendingWriteOpsAndCheck() {
+ long pending = pendingWriteOps.decrementAndGet();
+ if (pending == 0 && isFenced) {
+ synchronized (this) {
+ if (isFenced) {
+ messageDeduplication.resetHighestSequenceIdPushed();
+ log.info("[{}] Un-fencing topic...", topic);
+ isFenced = false;
+ }
+
+ }
}
}
@@ -294,35 +336,50 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
+
+ decrementPendingWriteOpsAndCheck();
}
@Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- PublishContext callback = (PublishContext) ctx;
-
- if (exception instanceof ManagedLedgerAlreadyClosedException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
- }
-
- callback.completed(new TopicClosedException(exception), -1, -1);
- return;
-
- } else {
- log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
- }
+ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
- if (exception instanceof ManagedLedgerTerminatedException) {
- // Signal the producer that this topic is no longer available
- callback.completed(new TopicTerminatedException(exception), -1, -1);
- } else {
- // Use generic persistence exception
- callback.completed(new PersistenceException(exception), -1, -1);
- }
+ // fence topic when failed to write a message to BK
+ isFenced = true;
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();
+ } else {
+
+ // close all producers
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ producers.forEach(producer -> futures.add(producer.disconnect()));
+ FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
+ decrementPendingWriteOpsAndCheck();
+ return null;
+ });
+
+ PublishContext callback = (PublishContext) ctx;
+
+ if (exception instanceof ManagedLedgerAlreadyClosedException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+ }
+
+ callback.completed(new TopicClosedException(exception), -1, -1);
+ return;
+
+ } else {
+ log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+ }
+
+ if (exception instanceof ManagedLedgerTerminatedException) {
+ // Signal the producer that this topic is no longer available
+ callback.completed(new TopicTerminatedException(exception), -1, -1);
+ } else {
+ // Use generic persistence exception
+ callback.completed(new PersistenceException(exception), -1, -1);
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index aa6e9d4..a29de11 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -21,17 +21,29 @@ package org.apache.pulsar.broker.service.persistent;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
+import java.util.concurrent.ScheduledExecutorService;
+
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -115,6 +127,170 @@ public class MessageDuplicationTest {
assertEquals(lastSequenceIdPushed.longValue(), 5);
}
+ @Test
+ public void testIsDuplicateWithFailure() {
+
+ PulsarService pulsarService = mock(PulsarService.class);
+ ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+ serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+ serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+ serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+
+ doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+
+ ManagedLedger managedLedger = mock(ManagedLedger.class);
+ MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
+ doReturn(true).when(messageDeduplication).isEnabled();
+
+
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Object[] args = invocationOnMock.getArguments();
+ Runnable test = (Runnable) args[0];
+ test.run();
+ return null;
+ }
+ }).when(scheduledExecutorService).submit(any(Runnable.class));
+
+ BrokerService brokerService = mock(BrokerService.class);
+ doReturn(scheduledExecutorService).when(brokerService).executor();
+ doReturn(pulsarService).when(brokerService).pulsar();
+
+ PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication));
+
+ String producerName1 = "producer1";
+ ByteBuf byteBuf1 = getMessage(producerName1, 0);
+ Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0);
+
+ String producerName2 = "producer2";
+ ByteBuf byteBuf2 = getMessage(producerName2, 1);
+ Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1);
+
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ persistentTopic.addComplete(new PositionImpl(0, 1), publishContext1);
+ verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 0);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 0);
+
+ persistentTopic.publishMessage(byteBuf2, publishContext2);
+ persistentTopic.addComplete(new PositionImpl(0, 2), publishContext2);
+ verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+
+ byteBuf1 = getMessage(producerName1, 1);
+ publishContext1 = getPublishContext(producerName1, 1);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ persistentTopic.addComplete(new PositionImpl(0, 3), publishContext1);
+ verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+
+ byteBuf1 = getMessage(producerName1, 5);
+ publishContext1 = getPublishContext(producerName1, 5);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ persistentTopic.addComplete(new PositionImpl(0, 4), publishContext1);
+ verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 5);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 5);
+
+ // publish dup
+ byteBuf1 = getMessage(producerName1, 0);
+ publishContext1 = getPublishContext(producerName1, 0);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 5);
+ verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
+
+ // publish message unknown dup status
+ byteBuf1 = getMessage(producerName1, 6);
+ publishContext1 = getPublishContext(producerName1, 6);
+ // don't complete message
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 6);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 5);
+
+ // publish same message again
+ byteBuf1 = getMessage(producerName1, 6);
+ publishContext1 = getPublishContext(producerName1, 6);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L));
+
+ // complete seq 6 message eventually
+ persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1);
+
+ // simulate failure
+ byteBuf1 = getMessage(producerName1, 7);
+ publishContext1 = getPublishContext(producerName1, 7);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
+
+ persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext1);
+ // check highestSequencedPushed is reset
+ assertEquals(messageDeduplication.highestSequencedPushed.size(), 2);
+ assertEquals(messageDeduplication.highestSequencedPersisted.size(), 2);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertEquals(lastSequenceIdPushed.longValue(), 6);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertEquals(lastSequenceIdPushed.longValue(), 6);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
+ assertEquals(lastSequenceIdPushed.longValue(), 1);
+ verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed();
+
+ // try dup
+ byteBuf1 = getMessage(producerName1, 6);
+ publishContext1 = getPublishContext(producerName1, 6);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 6);
+
+ // try new message
+ byteBuf1 = getMessage(producerName1, 8);
+ publishContext1 = getPublishContext(producerName1, 8);
+ persistentTopic.publishMessage(byteBuf1, publishContext1);
+ verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any());
+ persistentTopic.addComplete(new PositionImpl(0, 5), publishContext1);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 8);
+ lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
+ assertTrue(lastSequenceIdPushed != null);
+ assertEquals(lastSequenceIdPushed.longValue(), 8);
+
+ }
+
public ByteBuf getMessage(String producerName, long seqId) {
PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
.setProducerName(producerName).setSequenceId(seqId)
@@ -127,7 +303,7 @@ public class MessageDuplicationTest {
}
public Topic.PublishContext getPublishContext(String producerName, long seqId) {
- return new Topic.PublishContext() {
+ return spy(new Topic.PublishContext() {
@Override
public String getProducerName() {
return producerName;
@@ -141,6 +317,6 @@ public class MessageDuplicationTest {
public void completed(Exception e, long ledgerId, long entryId) {
}
- };
+ });
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
new file mode 100644
index 0000000..55020c6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.io.PulsarFunctionE2ETest;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class ClientDeduplicationFailureTest {
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ ServiceConfiguration config;
+ URL url;
+ PulsarService pulsar;
+ PulsarAdmin admin;
+ PulsarClient pulsarClient;
+ BrokerStats brokerStatsClient;
+ final String tenant = "external-repl-prop";
+ String primaryHost;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int brokerWebServicePort = PortManager.nextFreePort();
+ private final int brokerServicePort = PortManager.nextFreePort();
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+
+ @BeforeMethod(timeOut = 300000)
+ void setup(Method method) throws Exception {
+ log.info("--- Setting up method {} ---", method.getName());
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager::nextFreePort);
+ bkEnsemble.start();
+
+ String brokerServiceUrl = "http://127.0.0.1:" + brokerWebServicePort;
+
+ config = spy(new ServiceConfiguration());
+ config.setClusterName("use");
+ config.setWebServicePort(Optional.ofNullable(brokerWebServicePort));
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(Optional.ofNullable(brokerServicePort));
+ config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setTlsAllowInsecureConnection(true);
+ config.setAdvertisedAddress("localhost");
+ config.setLoadBalancerSheddingEnabled(false);
+
+ config.setAllowAutoTopicCreationType("non-partitioned");
+
+ url = new URL(brokerServiceUrl);
+ pulsar = new PulsarService(config, Optional.empty());
+ pulsar.start();
+
+ admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build();
+
+ brokerStatsClient = admin.brokerStats();
+ primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+ // update cluster metadata
+ ClusterData clusterData = new ClusterData(url.toString());
+ admin.clusters().createCluster(config.getClusterName(), clusterData);
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()).maxBackoffInterval(1, TimeUnit.SECONDS);
+ pulsarClient = clientBuilder.build();
+
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ admin.tenants().createTenant(tenant, tenantInfo);
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ admin.close();
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+ @Test
+ public void testClientDeduplicationWithBkFailure() throws Exception {
+ final String namespacePortion = "dedup";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String subscriptionName1 = "sub1";
+ final String subscriptionName2 = "sub2";
+ final String consumerName1 = "test-consumer-1";
+ final String consumerName2 = "test-consumer-2";
+ final List<Message<String>> msgRecvd = new LinkedList<>();
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+ admin.namespaces().setDeduplicationStatus(replNamespace, true);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic)
+ .producerName("test-producer-1").create();
+ Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
+ .consumerName(consumerName1).subscriptionName(subscriptionName1).subscribe();
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(sourceTopic)
+ .consumerName(consumerName2).subscriptionName(subscriptionName2).subscribe();
+
+ new Thread(() -> {
+ while(true) {
+ try {
+ Message<String> msg = consumer2.receive();
+ msgRecvd.add(msg);
+ consumer2.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ log.error("Failed to consume message: {}", e, e);
+ }
+ }
+ }).start();
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ boolean c1 = topicStats!= null
+ && topicStats.subscriptions.get(subscriptionName1) != null
+ && topicStats.subscriptions.get(subscriptionName1).consumers.size() == 1
+ && topicStats.subscriptions.get(subscriptionName1).consumers.get(0).consumerName.equals(consumerName1);
+
+ boolean c2 = topicStats!= null
+ && topicStats.subscriptions.get(subscriptionName2) != null
+ && topicStats.subscriptions.get(subscriptionName2).consumers.size() == 1
+ && topicStats.subscriptions.get(subscriptionName2).consumers.get(0).consumerName.equals(consumerName2);
+ return c1 && c2;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 200);
+
+ TopicStats topicStats1 = admin.topics().getStats(sourceTopic);
+ assertTrue(topicStats1!= null);
+ assertTrue(topicStats1.subscriptions.get(subscriptionName1) != null);
+ assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.size(), 1);
+ assertEquals(topicStats1.subscriptions.get(subscriptionName1).consumers.get(0).consumerName, consumerName1);
+ TopicStats topicStats2 = admin.topics().getStats(sourceTopic);
+ assertTrue(topicStats2!= null);
+ assertTrue(topicStats2.subscriptions.get(subscriptionName2) != null);
+ assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.size(), 1);
+ assertEquals(topicStats2.subscriptions.get(subscriptionName2).consumers.get(0).consumerName, consumerName2);
+
+ for (int i=0; i<10; i++) {
+ producer.newMessage().sequenceId(i).value("foo-" + i).send();
+ }
+
+ for (int i=0; i<10; i++) {
+ Message<String> msg = consumer1.receive();
+ consumer1.acknowledge(msg);
+ assertEquals(msg.getValue(), "foo-" + i);
+ assertEquals(msg.getSequenceId(), i);
+ }
+
+ log.info("Stopping BK...");
+ bkEnsemble.stopBK();
+
+ List<CompletableFuture<MessageId>> futures = new LinkedList<>();
+ for (int i=10; i<20; i++) {
+ CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync();
+ int finalI = i;
+ future.thenRun(() -> log.error("message: {} successful", finalI)).exceptionally((Function<Throwable, Void>) throwable -> {
+ log.info("message: {} failed: {}", finalI, throwable, throwable);
+ return null;
+ });
+ futures.add(future);
+ }
+
+ for (int i = 0; i < futures.size(); i++) {
+ try {
+ // message should not be produced successfully
+ futures.get(i).join();
+ fail();
+ } catch (CompletionException ex) {
+
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ try {
+ producer.newMessage().sequenceId(10).value("foo-10").send();
+ fail();
+ } catch (PulsarClientException ex) {
+
+ }
+
+ try {
+ producer.newMessage().sequenceId(10).value("foo-10").send();
+ fail();
+ } catch (PulsarClientException ex) {
+
+ }
+
+ log.info("Starting BK...");
+ bkEnsemble.startBK();
+
+ for (int i=20; i<30; i++) {
+ producer.newMessage().sequenceId(i).value("foo-" + i).send();
+ }
+
+ MessageId lastMessageId = null;
+ for (int i=20; i<30; i++) {
+ Message<String> msg = consumer1.receive();
+ lastMessageId = msg.getMessageId();
+ consumer1.acknowledge(msg);
+ assertEquals(msg.getValue(), "foo-" + i);
+ assertEquals(msg.getSequenceId(), i);
+ }
+
+ // check all messages
+ retryStrategically((test) -> msgRecvd.size() >= 20, 5, 200);
+
+ assertEquals(msgRecvd.size(), 20);
+ for (int i=0; i<10; i++) {
+ assertEquals(msgRecvd.get(i).getValue(), "foo-" + i);
+ assertEquals(msgRecvd.get(i).getSequenceId(), i);
+ }
+ for (int i=10; i<20; i++) {
+ assertEquals(msgRecvd.get(i).getValue(), "foo-" + (i + 10));
+ assertEquals(msgRecvd.get(i).getSequenceId(), i + 10);
+ }
+
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+ MessageIdImpl messageId = (MessageIdImpl) consumer1.getLastMessageId();
+
+ assertEquals(messageId.getLedgerId(), batchMessageId.getLedgerId());
+ assertEquals(messageId.getEntryId(), batchMessageId.getEntryId());
+ }
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 373f8bc..4c4d92d 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -53,6 +54,7 @@ import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
@@ -162,10 +164,6 @@ public class LocalBookkeeperEnsemble {
StreamStorageLifecycleComponent streamStorage;
Integer streamStoragePort = 4181;
- /**
- * @param args
- */
-
private void runZookeeper(int maxCC) throws IOException {
// create a ZooKeeper server(dataDir, dataLogDir, port)
LOG.info("Starting ZK server");
@@ -399,6 +397,37 @@ public class LocalBookkeeperEnsemble {
}
}
+ public void stopBK() {
+ LOG.debug("Local ZK/BK stopping ...");
+ for (BookieServer bookie : bs) {
+ bookie.shutdown();
+ }
+ }
+
+ public void startBK() throws Exception {
+ for (int i = 0; i < numberOfBookies; i++) {
+
+ try {
+ bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
+ } catch (InvalidCookieException e) {
+ // InvalidCookieException can happen if the machine IP has changed
+ // Since we are running here a local bookie that is always accessed
+ // from localhost, we can ignore the error
+ for (String path : zkc.getChildren("/ledgers/cookies", false)) {
+ zkc.delete("/ledgers/cookies/" + path, -1);
+ }
+
+ // Also clean the on-disk cookie
+ new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
+
+ // Retry to start the bookie after cleaning the old left cookie
+ bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
+
+ }
+ bs[i].start();
+ }
+ }
+
public void stop() throws Exception {
if (null != streamStorage) {
LOG.debug("Local bk stream storage stopping ...");