You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/18 08:09:56 UTC
[pulsar-adapters] branch master updated: Fixed integration tests
(#22)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 9801c43 Fixed integration tests (#22)
9801c43 is described below
commit 9801c43898f1b31c6335382c5ea16eac200b603d
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 18 01:09:50 2021 -0700
Fixed integration tests (#22)
### Motivation
KafkaApiTest is failing
### Modifications
1. conversion between Pulsar topic name and Kafka TopicPartition ended up with TopicPartition using name with "-partition-<partition idx>"
2. Seek was not working correctly:
PulsarKafkaConsumer seeks to beginning, as asked.
Clears lastReceivedOffset in the process.
on poll it checks
```
if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
resetOffsets(tp);
}
```
seek didn't update unpolledPartitions - reset offset uses default strategy to reset => seeks to the end
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is already covered by existing tests, such as KafkaApiTest.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
No
### Documentation
- Does this pull request introduce a new feature? NO
---
.../clients/consumer/PulsarKafkaConsumer.java | 46 ++++++++++++++--------
.../test/KafkaProducerSimpleConsumerTest.java | 2 +
.../spark/SparkStreamingPulsarReceiverTest.java | 23 +++++------
3 files changed, 44 insertions(+), 27 deletions(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 9ef4e96..ce9b9d2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.utils.FunctionCommon;
import java.time.Duration;
import java.util.AbstractMap;
@@ -59,7 +58,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -277,9 +275,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
- TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getLocalName(),
- partitionIndex);
+ TopicPartition tp = normalizedTopicPartition(topic, partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
@@ -291,9 +287,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
- TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getLocalName(),
- 0);
+ TopicPartition tp = normalizedTopicPartition(topic, 0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
@@ -327,6 +321,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
}
+ private TopicPartition normalizedTopicPartition(TopicPartition tp) {
+ return normalizedTopicPartition(tp.topic(), tp.partition());
+ }
+
+ private TopicPartition normalizedTopicPartition(String topic, int partition) {
+ String name = TopicName.get(topic).getPartitionedTopicName();
+ return new TopicPartition(name, partition);
+ }
+
@Override
public void assign(Collection<TopicPartition> partitions) {
Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
@@ -372,7 +375,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
- String topic = topicName.getLocalName();
+ String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
@@ -504,12 +507,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ Preconditions.checkNotNull(offsets);
List<CompletableFuture<Void>> futures = new ArrayList<>();
applyConsumerInterceptorsOnCommit(interceptors, offsets);
- offsets.forEach((topicPartition, offsetAndMetadata) -> {
+ offsets.forEach((tp, offsetAndMetadata) -> {
+ TopicPartition topicPartition = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
- lastCommittedOffset.put(topicPartition, offsetAndMetadata);
+
+ lastCommittedOffset.put(tp, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
});
@@ -566,7 +572,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
- org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
+ TopicPartition topicPartition = normalizedTopicPartition(partition);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
@@ -594,12 +601,14 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
lastReceivedOffset.clear();
for (TopicPartition tp : partitions) {
- org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+ TopicPartition normalizedTp = normalizedTopicPartition(tp);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.earliest));
+ unpolledPartitions.add(tp);
}
}
@@ -617,12 +626,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
lastReceivedOffset.clear();
for (TopicPartition tp : partitions) {
- org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+ TopicPartition normalizedTp = normalizedTopicPartition(tp);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
+
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.latest));
+ unpolledPartitions.add(tp);
}
}
@@ -712,7 +724,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public void pause(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
- org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+ TopicPartition topicPartition = normalizedTopicPartition(p);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.pause();
}
@@ -722,7 +735,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public void resume(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
- org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+ TopicPartition topicPartition = normalizedTopicPartition(p);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.resume();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
index 2177021..c2a024b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
@@ -141,6 +141,7 @@ public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
producer.send(message);
}
producer.close();
+ Thread.sleep(500);
// (2) Consume using simple consumer
PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
@@ -158,6 +159,7 @@ public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
.build();
FetchResponse fetchResponse = consumer.fetch(fReq);
+ Thread.sleep(500);
long lastOffset = 0;
MessageId offset = null;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index bd619a5..443514c 100644
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.Consumer;
@@ -48,7 +49,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";
@Test(dataProvider = "ServiceUrls")
- public void testReceivedMessage(String serviceUrl) throws Exception {
+ public void testReceivedMessage(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
@@ -68,14 +69,14 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
consConf.setMessageListener(msgListener);
SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
- serviceUrl,
+ serviceUrl.get(),
consConf,
new AuthenticationDisabled());
receiver.onStart();
waitForTransmission();
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
producer.send(EXPECTED_MESSAGE.getBytes());
@@ -85,7 +86,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
}
@Test(dataProvider = "ServiceUrls")
- public void testDefaultSettingsOfReceiver(String serviceUrl) {
+ public void testDefaultSettingsOfReceiver(Supplier<String> serviceUrl) {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
@@ -94,7 +95,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
consConf.setSubscriptionName(SUBS);
SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
- serviceUrl,
+ serviceUrl.get(),
consConf,
new AuthenticationDisabled());
@@ -103,7 +104,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
}
@Test(dataProvider = "ServiceUrls")
- public void testSharedSubscription(String serviceUrl) throws Exception {
+ public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
@@ -120,12 +121,12 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
});
SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
- serviceUrl,
+ serviceUrl.get(),
consConf,
new AuthenticationDisabled());
SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
- serviceUrl,
+ serviceUrl.get(),
consConf,
new AuthenticationDisabled());
@@ -133,7 +134,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
receiver2.onStart();
waitForTransmission();
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
for (int i = 0; i < 10; i++) {
producer.send(EXPECTED_MESSAGE.getBytes());
@@ -149,8 +150,8 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")
- public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
- new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
+ public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
+ new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
}
private static void waitForTransmission() {