You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2023/03/29 18:34:19 UTC
[pulsar] branch branch-2.11 updated: [improve][io] KCA: option to collapse partitioned topics (#19923)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new b2f734f4594 [improve][io] KCA: option to collapse partitioned topics (#19923)
b2f734f4594 is described below
commit b2f734f45949131c9e5a4c36a57db026f3bafac7
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 28 23:54:05 2023 -0700
[improve][io] KCA: option to collapse partitioned topics (#19923)
(cherry picked from commit 7cb48fd9d41f8606d86a0503b0130aec6b9d00d5)
---
pulsar-io/kafka-connect-adaptor/pom.xml | 1 +
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 20 ++++-
.../connect/PulsarKafkaConnectSinkConfig.java | 5 ++
.../io/kafka/connect/KafkaConnectSinkTest.java | 88 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 2 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 24cf0543545..5e03308960f 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -42,6 +42,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index bbf6bdae4e7..0f2321eaaa4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
@@ -92,6 +93,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
protected String topicName;
private boolean sanitizeTopicName = false;
+ // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
+ private boolean collapsePartitionedTopics = false;
+
private final Cache<String, String> sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
@@ -160,6 +164,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
topicName = kafkaSinkConfig.getTopic();
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
+ collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -418,8 +423,19 @@ public class KafkaConnectSink implements Sink<GenericObject> {
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
- final int partition = sourceRecord.getPartitionIndex().orElse(0);
- final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+ final int partition;
+ final String topic;
+
+ if (collapsePartitionedTopics
+ && sourceRecord.getTopicName().isPresent()
+ && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
+ TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
+ partition = tn.getPartitionIndex();
+ topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
+ } else {
+ partition = sourceRecord.getPartitionIndex().orElse(0);
+ topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+ }
final Object key;
final Object value;
final Schema keySchema;
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index a6e8b5c966e..f15704de4d0 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -95,6 +95,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
private boolean sanitizeTopicName = false;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
+ private boolean collapsePartitionedTopics = false;
+
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 9d7cb030271..775b8f2bee2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1567,6 +1567,94 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
assertEquals(ref.getBatchIdx(), batchIdx);
}
+ @Test
+ public void collapsePartitionedTopicEnabledTest() throws Exception {
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic",
+ 1);
+
+ testCollapsePartitionedTopic(true,
+ "persistent://a/b/fake-topic",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(true,
+ "fake-topic-partition-5",
+ "persistent://public/default/fake-topic",
+ 5);
+ }
+
+ @Test
+ public void collapsePartitionedTopicDisabledTest() throws Exception {
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic-partition-0",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic-partition-1",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "persistent://a/b/fake-topic",
+ "persistent://a/b/fake-topic",
+ 0);
+
+ testCollapsePartitionedTopic(false,
+ "fake-topic-partition-5",
+ "fake-topic-partition-5",
+ 0);
+ }
+
+ private void testCollapsePartitionedTopic(boolean isEnabled,
+ String pulsarTopic,
+ String expectedKafkaTopic,
+ int expectedPartition) throws Exception {
+ props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+ props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ obj.put("field1", (byte) 10);
+ obj.put("field2", "test");
+ obj.put("field3", (short) 100);
+
+ final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getKey()).thenReturn("key");
+ when(msg.hasKey()).thenReturn(true);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName(pulsarTopic)
+ .message(msg)
+ .schema(pulsarAvroSchema)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+ Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
+ Assert.assertEquals((int)sinkRecord.kafkaPartition(), expectedPartition);
+
+ sink.close();
+ }
+
@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");