You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "swamymavuri (via GitHub)" <gi...@apache.org> on 2023/03/15 18:35:48 UTC

[GitHub] [pulsar-adapters] swamymavuri opened a new pull request, #46: Fix [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

swamymavuri opened a new pull request, #46:
URL: https://github.com/apache/pulsar-adapters/pull/46

   … for producer and consumer
   
   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1145926460


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -70,6 +72,7 @@
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.bouncycastle.util.encoders.Hex;

Review Comment:
   Hi @lhotari,
    Do we need to encode/decode headers with Hex?
    Or String to byte conversion will suffice here?
    Please confirm so that i will add the header encoding accordingly.
    As of now, i just removed hex encoding and decoding and used bytes to produce and consume



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1146162466


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -404,10 +409,28 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
                     timestamp = msg.getEventTime();
                     timestampType = TimestampType.CREATE_TIME;
                 }
-
-                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
-                        timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
-
+                ConsumerRecord<K, V> consumerRecord;
+                if (msg.getProperties() != null) {
+                    Headers headers = new RecordHeaders();
+                    msg.getProperties().forEach((k, v) -> {
+                        if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {

Review Comment:
   Here, we are filtering header keys other than starting with **kafka.header.**
   But in pulsar, every property added to TypedMessageBuilder is treated as header(for ex.Partitioned id ). Here, we are skipping those properties as  those variables are not encoded in Hex and converting those will lead to **Decoder exception.**
   
   if  we want those properties also, we need to add one more condition as below.
    ```
   if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {
    headers.add(originalKey, Hex.decodeHex(v));
   }else{
    headers.add(originalKey, v.getBytes());
   }
   ```
   @lhotari @cbornet @dlg99  
    Let me know is to **OK** to skip those property headers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers… [pulsar-adapters]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1406053682


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.mockito.ArgumentMatchers.anyCollection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.kafka.clients.constants.MessageConstants;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.naming.TopicName;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class PulsarKafkaConsumerTest {
+
+    @Test
+    public void testPulsarKafkaConsumerWithHeaders_noAck() throws Exception {
+        Consumer consumer = Mockito.mock(Consumer.class);
+        String topic = "topic";
+
+        Mockito.when(Mockito.mock(TopicName.class).getPartitionedTopicName()).thenReturn(topic);
+        Mockito.doReturn("topic").when(consumer).getTopic();
+
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPublishTime(System.currentTimeMillis());
+
+        Map<String, String> headerMap = new HashMap<>();
+        String kafkaHeaderKey = MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX + "header1";
+        String kafkaHeaderValue = Hex.encodeHexString(kafkaHeaderKey.getBytes());
+        headerMap.put(kafkaHeaderKey, kafkaHeaderValue);
+        headerMap.put(KafkaMessageRouter.PARTITION_ID, "0");
+        Message<byte[]> msg = new MessageImpl<>(
+                topic,
+                "1:1",
+                headerMap,
+                "string".getBytes(),
+                Schema.BYTES,
+                messageMetadata
+        );
+
+        PulsarClient mockClient = Mockito.mock(PulsarClient.class);
+        PulsarClientImpl mockClientImpl = Mockito.mock(PulsarClientImpl.class);
+
+        CompletableFuture<Integer> mockNoOfPartitionFuture = CompletableFuture.completedFuture(1);
+
+        ClientBuilder mockClientBuilder = Mockito.mock(ClientBuilder.class);
+        Mockito.doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(Mockito.anyString());
+        Mockito.doReturn(mockClient).when(mockClientBuilder).build();
+
+        Mockito.when(mockClientImpl.getNumberOfPartitions(Mockito.anyString())).thenReturn(mockNoOfPartitionFuture);
+
+        Properties properties = new Properties();
+
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("pulsar://localhost:6650"));
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-subscription-name");
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumer =
+                new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumerSpy = Mockito.spy(pulsarKafkaConsumer);
+
+        Mockito.doNothing().when(pulsarKafkaConsumerSpy).seekToEnd(anyCollection());
+
+        pulsarKafkaConsumerSpy.received(consumer, msg);
+        pulsarKafkaConsumerSpy.poll(100);
+        pulsarKafkaConsumerSpy.close();
+
+        Assert.assertEquals(kafkaHeaderValue, msg.getProperty(kafkaHeaderKey));
+        Mockito.verify(pulsarKafkaConsumerSpy).seekToEnd(anyCollection());
+        Mockito.verify(consumer, Mockito.times(0)).acknowledgeCumulativeAsync(Mockito.any(MessageId.class));
+        Mockito.verify(Mockito.mock(Hex.class), Mockito.times(1)).decodeHex(Hex.encodeHexString(kafkaHeaderKey.getBytes()));

Review Comment:
   This line in this test doesn't make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers… [pulsar-adapters]

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1406063435


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java:
##########
@@ -37,6 +37,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.codec.binary.Hex;

Review Comment:
   yes, while encoding the kafka record headers 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1155094507


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -94,6 +120,10 @@ public void testPulsarKafkaConsumerWithHeaders() throws Exception {
         pulsarKafkaConsumer.received(consumer, msg);
         pulsarKafkaConsumer.poll(100);
         pulsarKafkaConsumer.close();
+
+
+        Assert.assertNotNull(msg.getProperty(kafkaHeader));

Review Comment:
   Asserted Header value from message property



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] nareshv commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "nareshv (via GitHub)" <gi...@apache.org>.
nareshv commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1145640315


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java:
##########
@@ -246,7 +248,13 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException {
         foo.setField2("field2");
         foo.setField3(3);
 
-        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+        Headers headers = new RecordHeaders();
+        String header1 = "header1";
+        String header2 = "header2";
+        headers.add(header1,header1.getBytes());
+        headers.add(header2,header2.getBytes());
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar, headers));

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1144033792


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java:
##########
@@ -246,7 +248,13 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException {
         foo.setField2("field2");
         foo.setField3(3);
 
-        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+        Headers headers = new RecordHeaders();
+        String header1 = "header1";
+        String header2 = "header2";
+        headers.add(header1,header1.getBytes());
+        headers.add(header2,header2.getBytes());
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar, headers));

Review Comment:
   Hi @nareshv ,
   Added KafkaConsumer test classes in`PulsarKafkaConsumerTest ` to cover the code flow of header section along with the existing code flow.
   As there are no integration test cases readily available for this to run either with server/docker container. so validation of the headers in the consumer is not possible at this moment.
   I will raise an issue and we can work for integration test cases later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] nareshv commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "nareshv (via GitHub)" <gi...@apache.org>.
nareshv commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1138092950


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java:
##########
@@ -246,7 +248,13 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException {
         foo.setField2("field2");
         foo.setField3(3);
 
-        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+        Headers headers = new RecordHeaders();
+        String header1 = "header1";
+        String header2 = "header2";
+        headers.add(header1,header1.getBytes());
+        headers.add(header2,header2.getBytes());
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar, headers));

Review Comment:
   can you also add a test to confirm these headers are seen in their original form in the `KafkaConsumerRecord` as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] cbornet commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "cbornet (via GitHub)" <gi...@apache.org>.
cbornet commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1146180464


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({TopicName.class, MessageIdUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class PulsarKafkaConsumerTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testPulsarKafkaConsumerWithHeaders() throws Exception {
+        PowerMockito.mockStatic(TopicName.class);
+        PowerMockito.mockStatic(MessageIdUtils.class);

Review Comment:
   In the core Pulsar, we don't use powermock anymore.
   We should probably also remove it here and start by not using it anymore.



##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java:
##########
@@ -246,7 +248,13 @@ public void testPulsarKafkaSendAvro() throws PulsarClientException {
         foo.setField2("field2");
         foo.setField3(3);
 
-        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+        Headers headers = new RecordHeaders();
+        String header1 = "header1";
+        String header2 = "header2";
+        headers.add(header1,header1.getBytes());
+        headers.add(header2,header2.getBytes());
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar, headers));

Review Comment:
   We should at least verify that the headers where set on the `mockTypedMessageBuilder`



##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({TopicName.class, MessageIdUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class PulsarKafkaConsumerTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testPulsarKafkaConsumerWithHeaders() throws Exception {
+        PowerMockito.mockStatic(TopicName.class);
+        PowerMockito.mockStatic(MessageIdUtils.class);
+
+        TopicName topicName = mock(TopicName.class);
+
+        doReturn("topic").when(topicName).getPartitionedTopicName();
+
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        Consumer consumer = mock(Consumer.class);
+        Message msg = mock(Message.class);
+        MessageId msgId = mock(MessageId.class);
+
+        PulsarClient mockClient = mock(PulsarClient.class);
+
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        when(TopicName.get(any())).thenReturn(topicName);
+        when(msg.getMessageId()).thenReturn(msgId);
+        doReturn(mockClient).when(mockClientBuilder).build();
+
+        Properties properties = new Properties();
+
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("pulsar://localhost:6650"));
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-subscription-name");
+
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumerSpy =
+                spy(new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer()));
+
+        doNothing().when(pulsarKafkaConsumerSpy).seekToEnd(anyCollection());
+        PowerMockito.whenNew(PulsarKafkaConsumer.class).withAnyArguments().thenReturn(pulsarKafkaConsumerSpy);
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumer =
+                new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());
+
+        pulsarKafkaConsumer.received(consumer, msg);
+        pulsarKafkaConsumer.poll(100);
+        pulsarKafkaConsumer.close();
+    }
+
+    @Test
+    public void testPulsarKafkaConsumer() throws Exception {

Review Comment:
   I don't understand what this test does. There are no assertions, no messages, no headers verification, ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1145926460


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -70,6 +72,7 @@
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.bouncycastle.util.encoders.Hex;

Review Comment:
   Hi @lhotari,
    Do we need to encode/decode headers with Hex?
    Or String to byte conversion will suffice here?
    Please confirm so that i will add the header encoding accordingly.
    As of now, i just removed hex encoding and decoding and used bytes to produce and consume



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1148767548


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({TopicName.class, MessageIdUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class PulsarKafkaConsumerTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testPulsarKafkaConsumerWithHeaders() throws Exception {
+        PowerMockito.mockStatic(TopicName.class);
+        PowerMockito.mockStatic(MessageIdUtils.class);
+
+        TopicName topicName = mock(TopicName.class);
+
+        doReturn("topic").when(topicName).getPartitionedTopicName();
+
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        Consumer consumer = mock(Consumer.class);
+        Message msg = mock(Message.class);
+        MessageId msgId = mock(MessageId.class);
+
+        PulsarClient mockClient = mock(PulsarClient.class);
+
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        when(TopicName.get(any())).thenReturn(topicName);
+        when(msg.getMessageId()).thenReturn(msgId);
+        doReturn(mockClient).when(mockClientBuilder).build();
+
+        Properties properties = new Properties();
+
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("pulsar://localhost:6650"));
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-subscription-name");
+
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumerSpy =
+                spy(new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer()));
+
+        doNothing().when(pulsarKafkaConsumerSpy).seekToEnd(anyCollection());
+        PowerMockito.whenNew(PulsarKafkaConsumer.class).withAnyArguments().thenReturn(pulsarKafkaConsumerSpy);
+
+        PulsarKafkaConsumer<Integer, String> pulsarKafkaConsumer =
+                new PulsarKafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());
+
+        pulsarKafkaConsumer.received(consumer, msg);
+        pulsarKafkaConsumer.poll(100);
+        pulsarKafkaConsumer.close();
+    }
+
+    @Test
+    public void testPulsarKafkaConsumer() throws Exception {

Review Comment:
   Added assertion statements for Header Verification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers… [pulsar-adapters]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#issuecomment-1827695713

   It would be useful to have an integration test. There are existing integrations tests in https://github.com/apache/pulsar-adapters/blob/master/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/ . Please add one for validation the correct behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers… [pulsar-adapters]

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1146162466


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -404,10 +409,28 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
                     timestamp = msg.getEventTime();
                     timestampType = TimestampType.CREATE_TIME;
                 }
-
-                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
-                        timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
-
+                ConsumerRecord<K, V> consumerRecord;
+                if (msg.getProperties() != null) {
+                    Headers headers = new RecordHeaders();
+                    msg.getProperties().forEach((k, v) -> {
+                        if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {

Review Comment:
   Here, we are omitting header keys other than starting with **kafka.header.**
   But in pulsar, every property added to TypedMessageBuilder is treated as header(for ex.Partitioned id ). Here, we are skipping those properties as  those variables are not encoded in Hex and converting those will lead to **Decoder exception.**
   
   if  we want those properties also, we need to add one more condition as below.
    ```
   if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {
    headers.add(originalKey, Hex.decodeHex(v));
   }else{
    headers.add(originalKey, v.getBytes());
   }
   ```
   @lhotari @cbornet @dlg99  
    Let me know is to **OK** to skip those property headers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] cbornet commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "cbornet (via GitHub)" <gi...@apache.org>.
cbornet commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1152574905


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java:
##########
@@ -94,6 +120,10 @@ public void testPulsarKafkaConsumerWithHeaders() throws Exception {
         pulsarKafkaConsumer.received(consumer, msg);
         pulsarKafkaConsumer.poll(100);
         pulsarKafkaConsumer.close();
+
+
+        Assert.assertNotNull(msg.getProperty(kafkaHeader));

Review Comment:
   Can we assert the value of the header ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#issuecomment-1509121809

   @cbornet , Can you please take a look at this PR and let me know if i need to add any


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] lhotari commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1145703250


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java:
##########
@@ -37,6 +37,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.codec.binary.Hex;

Review Comment:
   Is this import used at all?



##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -70,6 +72,7 @@
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.bouncycastle.util.encoders.Hex;

Review Comment:
   use org.apache.commons.codec.binary.Hex instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] swamymavuri commented on a diff in pull request #46: [Issue #45] [pulsar-client-kafka-compat] Handled Kafka record headers…

Posted by "swamymavuri (via GitHub)" <gi...@apache.org>.
swamymavuri commented on code in PR #46:
URL: https://github.com/apache/pulsar-adapters/pull/46#discussion_r1146162466


##########
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java:
##########
@@ -404,10 +409,28 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
                     timestamp = msg.getEventTime();
                     timestampType = TimestampType.CREATE_TIME;
                 }
-
-                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
-                        timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
-
+                ConsumerRecord<K, V> consumerRecord;
+                if (msg.getProperties() != null) {
+                    Headers headers = new RecordHeaders();
+                    msg.getProperties().forEach((k, v) -> {
+                        if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {

Review Comment:
   Here, we are omitting header keys other than starting with **kafka.header.**
   But in pulsar, every property added to TypedMessageBuilder is treated as header(for ex.Partitioned id ). Here, we are skipping those properties as  those variables are not encoded in Hex and converting those will lead to **Decoder exception.**
   
   if  we want those properties also, we need to add one more condition as below.
    ```
   if (k.startsWith(MessageConstants.KAFKA_MESSAGE_HEADER_PREFIX)) {
    headers.add(originalKey, Hex.decodeHex(v));
   }else{
    headers.add(originalKey, v.getBytes());
   }
   ```
   @lhotari @cbornet @dlg99  
    Let me know is to **OK** to skip those property headers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org