You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:28 UTC
[pulsar] 19/25: Fix backward compatibility issues with batch index
acknowledgment. (#7655)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 12cca8a5c9cef45eba0cd72c3df9d30c95931af1
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 28 23:49:42 2020 +0800
Fix backward compatibility issues with batch index acknowledgment. (#7655)
### Motivation
Fix backward compatibility issues with batch index acknowledgment.
### Modifications
Disable batch index acknowledgment by default at the consumer side.
(cherry picked from commit fffd9f144bb14a220d17e951fea29b16ad2db103)
---
.../client/impl/BatchMessageIndexAckTest.java | 3 ++
.../apache/pulsar/client/api/ConsumerBuilder.java | 6 +++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 6 +++
.../apache/pulsar/client/impl/ConsumerImpl.java | 8 ++--
.../impl/conf/ConsumerConfigurationData.java | 2 +
...t2_2.java => PulsarStandaloneTestSuite2_5.java} | 26 +++++++-----
.../backwardscompatibility/SmokeTest2_2.java | 4 ++
.../backwardscompatibility/SmokeTest2_3.java | 4 ++
.../backwardscompatibility/SmokeTest2_4.java | 4 ++
.../{SmokeTest2_2.java => SmokeTest2_5.java} | 6 ++-
.../integration/containers/PulsarContainer.java | 1 +
.../integration/topologies/PulsarTestBase.java | 46 ++++++++++++++++++++++
12 files changed, 102 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 3150f10..582d461 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -67,6 +67,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
.subscriptionName("sub")
.receiverQueueSize(100)
.subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
.subscribe();
@@ -125,6 +126,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
.topic(topic)
.subscriptionName("sub")
.receiverQueueSize(100)
+ .enableBatchIndexAcknowledgment(true)
.subscribe();
@Cleanup
@@ -194,6 +196,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
.topic(topic)
+ .enableBatchIndexAcknowledgment(true)
.subscriptionName("test")
.subscribe();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index def1807..9bb8ce0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -610,6 +610,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> enableRetry(boolean retryEnable);
/**
+ * Enable or disable the batch index acknowledgment. To enable this feature must ensure batch index acknowledgment
+ * feature is enabled at the broker side.
+ */
+ ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled);
+
+ /**
* Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While
* consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed
* with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9240ab3..64eadfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -416,6 +416,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled) {
+ conf.setBatchIndexAckEnabled(batchIndexAcknowledgmentEnabled);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 10320da..28995ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -543,9 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ackType);
}
} else {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
- batchMessageId.getBatchSize(), ackType, properties);
+ if (conf.isBatchIndexAckEnabled()) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+ acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
+ batchMessageId.getBatchSize(), ackType, properties);
+ }
// other messages in batch are still pending ack.
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index eb82703..14595e6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -125,6 +125,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private KeySharedPolicy keySharedPolicy;
+ private boolean batchIndexAckEnabled = false;
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
similarity index 56%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
index 7c1c2a1..cf88ca2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
@@ -16,21 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.tests.integration.backwardscompatibility;
-import org.testng.annotations.Test;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestBase implements ITest {
- @Test(dataProvider = "StandaloneServiceUrlAndTopics")
- public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
- super.testPublishAndConsume(serviceUrl, isPersistent);
+ @BeforeSuite
+ public void setUpCluster() throws Exception {
+ super.startCluster(PulsarContainer.PULSAR_2_5_IMAGE_NAME);
}
- @Test(dataProvider = "StandaloneServiceUrlAndTopics")
- public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
- super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
+ @AfterSuite
+ public void tearDownCluster() throws Exception {
+ super.stopCluster();
+ }
+ @Override
+ public String getTestName() {
+ return "pulsar-standalone-suite";
}
-
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
index 7c1c2a1..20e9926 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
}
+ @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+ public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+ super.testBatchIndexAckDisabled(serviceUrl);
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
index ab317d0..e1b37e3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
@@ -33,4 +33,8 @@ public class SmokeTest2_3 extends PulsarStandaloneTestSuite2_3 {
super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
}
+ @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+ public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+ super.testBatchIndexAckDisabled(serviceUrl);
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
index d74ad8e..eb77eaa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
@@ -33,4 +33,8 @@ public class SmokeTest2_4 extends PulsarStandaloneTestSuite2_4 {
super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
}
+ @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+ public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+ super.testBatchIndexAckDisabled(serviceUrl);
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
similarity index 83%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
index 7c1c2a1..2bcf584 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility;
import org.testng.annotations.Test;
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class SmokeTest2_5 extends PulsarStandaloneTestSuite2_5 {
@Test(dataProvider = "StandaloneServiceUrlAndTopics")
public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
}
+ @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+ public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+ super.testBatchIndexAckDisabled(serviceUrl);
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 15c99cc..ae5e57a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -41,6 +41,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
public static final int BROKER_HTTP_PORT = 8080;
public static final String DEFAULT_IMAGE_NAME = "apachepulsar/pulsar-test-latest-version:latest";
+ public static final String PULSAR_2_5_IMAGE_NAME = "apachepulsar/pulsar:2.5.0";
public static final String PULSAR_2_4_IMAGE_NAME = "apachepulsar/pulsar:2.4.0";
public static final String PULSAR_2_3_IMAGE_NAME = "apachepulsar/pulsar:2.3.0";
public static final String PULSAR_2_2_IMAGE_NAME = "apachepulsar/pulsar:2.2.0";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index 9e75e95..da58713 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -24,13 +24,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.junit.Assert;
public class PulsarTestBase {
@@ -130,4 +134,46 @@ public class PulsarTestBase {
}
}
+ public void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
+ String topicName = generateTopicName("test-batch-index-ack-disabled", true);
+ final int numMessages = 100;
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build()) {
+
+ try (Consumer<Integer> consumer = client.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName("sub")
+ .receiverQueueSize(100)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(false)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscribe();) {
+
+ try (Producer<Integer> producer = client.newProducer(Schema.INT32)
+ .topic(topicName)
+ .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+ .create()) {
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ futures.add(producer.sendAsync(i));
+ }
+ // Wait for all messages are publish succeed.
+ FutureUtil.waitForAll(futures).get();
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<Integer> m = consumer.receive();
+ if (i % 2 == 0) {
+ consumer.acknowledge(m);
+ }
+ }
+
+ Message<Integer> redelivery = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(redelivery);
+ }
+ }
+ }
+
}