You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/20 07:16:46 UTC
[pulsar] branch master updated: Support deduplication on topic
level (#7821)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b410274 Support deduplication on topic level (#7821)
b410274 is described below
commit b410274554508684f4a15cc1f5a98a71d85fbe2e
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Aug 20 15:14:09 2020 +0800
Support deduplication on topic level (#7821)
### Motivation
Support set `DeduplicationEnabled` on topic level
### Modifications
Support set/get/remove `DeduplicationEnabled` policy on topic level.
### Verifying this change
Added Unit test to verify set/get/remove `DeduplicationEnabled` policy at Topic level work as expected when Topic level policy is enabled/disabled
`org.apache.pulsar.broker.service.persistent.TopicDuplicationTest`
---
.../broker/admin/impl/PersistentTopicsBase.java | 15 ++
.../pulsar/broker/admin/v2/PersistentTopics.java | 59 ++++++++
.../service/persistent/MessageDeduplication.java | 7 +-
.../broker/service/persistent/PersistentTopic.java | 7 +-
.../broker/admin/MaxUnackedMessagesTest.java | 3 +-
.../service/persistent/TopicDuplicationTest.java | 160 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 45 ++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 78 ++++++++++
8 files changed, 371 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f6010f1..51c8952 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2189,6 +2189,21 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
+ TopicPolicies topicPolicies = null;
+ try {
+ topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.error("Topic {} policies cache have not init.", topicName);
+ throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
+ }
+ if (topicPolicies == null) {
+ topicPolicies = new TopicPolicies();
+ }
+ topicPolicies.setDeduplicationEnabled(enabled);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 42dbcb8..39477c052 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1269,6 +1269,65 @@ public class PersistentTopics extends PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+ @ApiOperation(value = "Get deduplication configuration of a topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+ public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+ if (topicPolicies.isDeduplicationSet()) {
+ asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+ @ApiOperation(value = "Set deduplication enabled on a topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+ public void setDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed updated deduplication", ex);
+ asyncResponse.resume(ex);
+ }else if (ex != null) {
+ log.error("Failed updated deduplication", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+ @ApiOperation(value = "Remove deduplication configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null);
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/retention")
@ApiOperation(value = "Get retention configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 2fb4944..c9c215d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
@@ -421,7 +422,11 @@ public class MessageDeduplication {
private CompletableFuture<Boolean> isDeduplicationEnabled() {
TopicName name = TopicName.get(topic.getName());
-
+ //Topic level setting has higher priority than namespace level
+ TopicPolicies topicPolicies = topic.getTopicPolicies(name);
+ if (topicPolicies != null && topicPolicies.isDeduplicationSet()) {
+ return CompletableFuture.completedFuture(topicPolicies.getDeduplicationEnabled());
+ }
return pulsar.getConfigurationCache().policiesCache()
.getAsync(AdminResource.path(POLICIES, name.getNamespace())).thenApply(policies -> {
// If namespace policies have the field set, it will override the broker-level setting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index daf08e1..74ef28e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2128,7 +2128,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
* @param topicName
* @return TopicPolicies is exist else return null.
*/
- private TopicPolicies getTopicPolicies(TopicName topicName) {
+ public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
@@ -2342,4 +2342,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
return maxUnackedMessagesOnSubscription;
}
+
+ @VisibleForTesting
+ public MessageDeduplication getMessageDeduplication() {
+ return messageDeduplication;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
index 93fd08c..f679f30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -305,12 +305,13 @@ public class MaxUnackedMessagesTest extends ProducerConsumerBase {
private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
+ //wait for server init
+ Thread.sleep(1000);
try {
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
break;
} catch (Exception e) {
//ignore
- Thread.sleep(100);
}
if (i == 49) {
throw new RuntimeException("Waiting for cache initialization has timed out");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
new file mode 100644
index 0000000..faa010a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TopicDuplicationTest extends ProducerConsumerBase {
+ private final String testTenant = "my-property";
+ private final String testNamespace = "my-ns";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setSystemTopicEnabled(true);
+ this.conf.setTopicLevelPoliciesEnabled(true);
+ this.conf.setBrokerDeduplicationEnabled(true);
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 10000)
+ public void testDuplicationApi() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ waitCacheInit(topicName);
+ admin.topics().createPartitionedTopic(topicName, 3);
+ Boolean enabled = admin.topics().getDeduplicationEnabled(topicName);
+ assertNull(enabled);
+
+ admin.topics().enableDeduplication(topicName, true);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(admin.topics().getDeduplicationEnabled(topicName), true);
+ admin.topics().disableDeduplication(topicName);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getDeduplicationEnabled(topicName) == null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertNull(admin.topics().getDeduplicationEnabled(topicName));
+ }
+
+ @Test(timeOut = 20000)
+ public void testDuplicationMethod() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ final String producerName = "my-producer";
+ final int maxMsgNum = 100;
+ waitCacheInit(topicName);
+ admin.topics().createPartitionedTopic(testTopic, 3);
+ //1) Start up producer and send msg.We specified the max sequenceId
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .producerName(producerName).create();
+ long seq = System.currentTimeMillis();
+ for (int i = 0; i <= maxMsgNum; i++) {
+ producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
+ }
+ long maxSeq = seq + maxMsgNum;
+ //2) Max sequenceId should be recorded correctly
+ CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopics().get(topicName);
+ Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
+ messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+ if (ex != null) {
+ fail("should not fail");
+ }
+ assertNotNull(messageDeduplication.highestSequencedPersisted);
+ assertNotNull(messageDeduplication.highestSequencedPushed);
+ long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
+ assertEquals(seqId, maxSeq);
+ assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
+ assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
+ }).get();
+ //3) disable the deduplication check
+ admin.topics().enableDeduplication(topicName, false);
+ for (int i = 0; i < 50; i++) {
+ if (admin.topics().getDeduplicationEnabled(topicName) != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send();
+ }
+ //4) Max sequenceId record should be clear
+ messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+ if (ex != null) {
+ fail("should not fail");
+ }
+ assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
+ assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
+ assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
+ }).get();
+
+ }
+
+ private void waitCacheInit(String topicName) throws Exception {
+ for (int i = 0; i < 50; i++) {
+ //wait for server init
+ Thread.sleep(3000);
+ try {
+ admin.topics().getDeduplicationEnabled(topicName);
+ break;
+ } catch (Exception e) {
+ //ignore
+ }
+ if (i == 49) {
+ throw new RuntimeException("Waiting for cache initialization has timed out");
+ }
+ }
+ }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 485dcce..6476f27 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1800,4 +1800,49 @@ public interface Topics {
* @param topic Topic name
*/
CompletableFuture<Void> removePersistenceAsync(String topic);
+
+ /**
+ * get deduplication enabled of a topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException;
+
+ /**
+ * get deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic);
+
+ /**
+ * set deduplication enabled of a topic.
+ * @param topic
+ * @param enabled
+ * @throws PulsarAdminException
+ */
+ void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException;
+
+ /**
+ * set deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @param enabled
+ * @return
+ */
+ CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled);
+
+ /**
+ * remove deduplication enabled of a topic.
+ * @param topic
+ * @throws PulsarAdminException
+ */
+ void disableDeduplication(String topic) throws PulsarAdminException;
+
+ /**
+ * remove deduplication enabled of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<Void> disableDeduplicationAsync(String topic);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b0dd98e..62a39b9 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1585,6 +1585,84 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException {
+ try {
+ return getDeduplicationEnabledAsync(topic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "deduplicationEnabled");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enabled) {
+ future.complete(enabled);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException {
+ try {
+ enableDeduplicationAsync(topic, enabled).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "deduplicationEnabled");
+ return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void disableDeduplication(String topic) throws PulsarAdminException {
+ try {
+ disableDeduplicationAsync(topic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> disableDeduplicationAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "deduplicationEnabled");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
try {
return getMaxUnackedMessagesOnSubscriptionAsync(topic).