You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/03 15:50:56 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

eolivelli opened a new pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448


   ### Motivation
   Currently KafkaSource allows only to deal with strings and byte arrays, it does not support records with Schema.
   In Kafka we have the ability to encode messages using Avro and there is a Schema Registry (by Confluent®)
   
   ### Modifications
   
   Summary of changes:
   - add new` org.apache.pulsar.io.kafka.KafkaAvroRecordSource` that reads from Kafka using `io.confluent.kafka.serializers.KafkaAvroDeserializer `produces GenericRecords to the Pulsar topic
   - this source support Schema Evolution end-to-end (i.e. add fields to the original schema in the Kafka world, and see the new fields in the Pulsar topic, without any reconfiguration or restart)
   - add Confluent® Schema Registry Client to the Kafka Connector NAR, the license is compatible with Apache 2 license and we can redistribute it
   - the configuration of the Schema Registry Client is done done in the consumerProperties property of the source (usually you add schema.registry.url)
   - add integration tests with Kafka and Schema Registry
   
   It also adds a few enhancements to the Pulsar IO runtime:
   - allow PulsarSink to deal with org.apache.pulsar.client.api.schema.GenericRecord: it must not enforce an empty schema to the topic while starting the source
   - allow AvroWriter to deal with GenericRecord that are not a subclass of AvroGenericRecord
   
   This patch includes this patch that is to be committed as pre-requisite
   https://github.com/apache/pulsar/pull/9396
   
   ### Verifying this change
   
   The patch introduces new unit tests and integration tests.
   The integration tests launch a Kafka Container and also a Confluent Schema Registry Container
   
   ### Documentation
   
   I will be happy to provide documentation once this patch is committed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773132055


   @eolivelli 
   
   Unfortunately, I disagree with you on this. This approach creates a very bad example for other people to follow.
   
   First of all, it basically creates a "connector" class per schema type. This is a very bad practice. I would discourage a connector implementation going down this route. It is impossible to maintain. 
   
   Secondly, it solves a very narrow scoped problem by introducing a specific type of source connector in which the key is a string and the value is AVRO. All the key schema information is dropped. Key schema information is important to a lot of streaming use cases. You can't solve the problem with this approach. Following your approach, you will end up creating N*N connector classes (where N is the number of schema types). 
   
   I would encourage people not introducing a code change that is specialized for their own needs. The connector implementation should be beneficial to broader users.
   
   > I would like not to enter the details of how Kafka and Confluent serialize data 
   
   Unfortunately, the Kafka AVRO is a confluent open-source thing not a Kafka community thing. 
   
   We are always in the game of touching serialization details when converting a message from X format to Y format. You either do the conversation at a high level using abstraction or at a low level by realizing the serialization details. 
   
   The approach you proposed also has a bad performance because it will churn a lot of object allocations. Realizing the serialization details can save a lot of memory copy and serialization/deserialization.
   
   > we allowing Sources to push GenericRecords,
   
   This is a good initiative. But it should be isolated from this KafkaSource change. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-792241876


   @sijie ping


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585640943



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());

Review comment:
       This is only a transient situation.
   Because we want to prepare the way to deal with non String keys.
   IIRC is was a @sijie's request to introduce this 'extractKey' method in order to make it clear the direction we are taking 

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());
+    }
+
+    public abstract Schema<V> extractSchema(ConsumerRecord<Object, Object> consumerRecord);
+
+    @Slf4j
     static private class KafkaRecord<V> implements Record<V> {

Review comment:
       (see previous answer)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585520586



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -54,6 +54,31 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>

Review comment:
       Is the pulsar-client-original dependency needed in the compile scope? the problem is that it will pull a lot of dependencies to the .nar files that are built. Perhaps `provided` scope would be fine? However I think that `provided` scope isn't transient and therefore it might not have impact in this library.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-785373819


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773132055






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773795534


   > let's find together the right way to provide features to the users in the best way for the project.
   
   yes. I already pointed out the right direction that we should head down.
   
   > We already have KafkaBytesSource and KafkaStringSource, so I am just adding a new flavour of the KafkaSource, in fact the implementation is just about adding a new subclass of KafkaAbstractSource.
   I am following the current style.
   
   It was there doesn't mean it is the right pattern to follow. Kafka connector was not the first connect added it. Most of the MQ connectors don't have the schema information. Hence it is okay to maintain `bytes` or `string` connectors. But if we are talking about schema-aware connectors, let's avoid using this pattern. It is impossible to maintain the list of connectors. I would avoid going down this route.
   
   > we can work on this issue as well (and that's on my backlog), I didn't want to introduce too many features.
   
   It is not about introducing too many features or not. This approach is hard to maintain. 
   
   > Using the Java Model with GenericRecord adds that additional cost, but the benefit are:
   
   The cost is huge when you have a very large struct. The connector should be designed to efficiently transfer data, instead of burning unnecessary CPUs. Let's avoid it as much as we can.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586205438



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       In this case, I don't have a strong opinion since there doesn't seem to be a common convention for handling compiler warnings in the Pulsar code base.
   
   In general, I think that the Java compiler warnings about unchecked and unsafe operations should be handled at least in production code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773143619


   @sijie  there no problem in disagreeing, 
   let's find together the right way to provide features to the users in the best way for the project.
   
   I am going to split the patch into two parts, this way we can make one step at a time.
   
   > First of all, it basically creates a "connector" class per schema type. This is a very bad practice. I would discourage a connector implementation going down this route. It is impossible to maintain.
   
   We already have `KafkaBytesSource` and `KafkaStringSource`, so I am just adding a new flavour of the KafkaSource, in fact the implementation is just about adding a new subclass of `KafkaAbstractSource`.
   I am following the current style.
   
   In my plans I would like to work more on this KafkaSource and on the KafkaSink and try to make the structure better.
   
   There is an open work that will allow to put more sinks on the same nar and provide a better user experience.
   https://github.com/apache/pulsar/pull/3678
   
   In the meanwhile users of the Kafka source can go with "--classname" (or they can select it from a Web UI for interactive Pulsar Management Consoles)
   
   > in which the key is a string and the value is AVRO
   
   we can work on this issue as well (and that's on my backlog), I didn't want to introduce too many features.
   
   I have users that are used to advanced data mapping mechanisms both for the key and for the value, so mapping the key is very important to me.
   That said, currently the  `KafkaAbstractSource` is working on a string key, that is a preexisting code
   
   The approach you proposed also has a bad performance because it will churn a lot
   
   >  of object allocations. Realizing the serialization details can save a lot of memory copy and serialization/deserialization.
   
   I know about this fact, and I know how the StreamNative connector works.
   
   Using the Java Model with GenericRecord adds that additional cost, but the benefit are:
   - to have simpler code, using code provided by the same vendors that are mantaining that serialization protocol 
   - we can follow the evolutions just by upgrading the Confluent library
   - we are using pure Kafka/Pulsar APIs, totally integrated with the framework, this will allow us to leverage all future improvements
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586183046



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       The reason why I made the comment is that in some projects, there's a convention that the Java compiler shouldn't produce warnings about the source code. Warnings should be suppressed or eliminated by fixing the code.
   
   The result of ignoring the type parameters is this type of warning:
   ```
   [INFO] /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java: /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java uses unchecked or unsafe operations.
   [INFO] /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java: Recompile with -Xlint:unchecked for details.
   ```
   
   After adding `<arg>-Xlint:unchecked</arg>` to `compilerArgs` in `maven-compiler-plugin`'s configuration, it produces this type of detailed warnings:
   ```
   [WARNING] /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[130,42] unchecked call to KafkaRecord(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String,?>,java.util.Optional<java.lang.String>,V,org.apache.pulsar.client.api.Schema<V>) as a member of the raw type org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   [WARNING] /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,28] unchecked method invocation: method consume in class org.apache.pulsar.io.core.PushSource is applied to given types
     required: org.apache.pulsar.functions.api.Record<T>
     found: org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   [WARNING] /some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,29] unchecked conversion
     required: org.apache.pulsar.functions.api.Record<T>
     found:    org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   ```
   
   Shouldn't these warnings be handled?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r571884723



##########
File path: pom.xml
##########
@@ -152,6 +152,7 @@ flexible messaging model and an intuitive client API.</description>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.14.0</prometheus-jmx.version>
     <confluent.version>5.3.2</confluent.version>
+    <kafka.confluent.schemaregstryclient.version>3.3.1</kafka.confluent.schemaregstryclient.version>

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r589181651



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithSchema.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.Value;
+
+/**
+ * This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
+ */
+@Value
+public class BytesWithSchema {

Review comment:
       BytesWithKafkaSchema

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();

Review comment:
       final

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<byte[]> get(int schemaId) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just do not want this map to grow
+            // without limits
+            cache.clear();

Review comment:
       why not use Guava cache?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +154,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());

Review comment:
       If you are introducing a `bytes` connector, let's not assume it is a `String`.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -41,16 +49,80 @@
 @Slf4j
 public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
 
+    private AvroSchemaCache schemaCache;
+
+    private static final Collection<String> SUPPORTED_KEY_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(StringDeserializer.class.getName()));
+
+    private static final Collection<String> SUPPORTED_VALUE_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(ByteArrayDeserializer.class.getName(), KafkaAvroDeserializer.class.getName()));
+
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         log.info("Created kafka consumer config : {}", props);
+
+        String currentKeyDeserializer = props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_KEY_DESERIALIZERS.contains(currentKeyDeserializer)) {
+            throw new IllegalArgumentException("Unsupported key deserializer: " + currentKeyDeserializer + ", only " + SUPPORTED_KEY_DESERIALIZERS);
+        }
+
+        String currentValueDeserializer = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_VALUE_DESERIALIZERS.contains(currentValueDeserializer)) {
+            throw new IllegalArgumentException("Unsupported value deserializer: " + currentValueDeserializer + ", only " + SUPPORTED_VALUE_DESERIALIZERS);
+        }
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValueDeserializer != null && currentValueDeserializer.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SchemaExtractorDeserializer.class.getName());
+            KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
+            List<String> urls = config.getSchemaRegistryUrls();
+            int maxSchemaObject = config.getMaxSchemasPerSubject();
+            SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
+            schemaCache = new AvroSchemaCache(schemaRegistryClient);
+        }
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return ((BytesWithSchema) value).getValue();
+        }
+        return value;
     }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<byte[]> extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return schemaCache.get(((BytesWithSchema) value).getSchemaId());
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class SchemaExtractorDeserializer implements Deserializer<BytesWithSchema> {

Review comment:
       Can you rename it as ExtractKafkaAvroSchemaDeserializer? Because this is specific to Kafka avro schema.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<byte[]> get(int schemaId) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just do not want this map to grow
+            // without limits
+            cache.clear();
+        }
+        return cache.computeIfAbsent(schemaId, id -> {
+            try {
+                org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
+                String definition = schema.toString(false);
+                log.info("Schema {} definition {}", schemaId, definition);
+                return Schema.AUTO_PRODUCE_BYTES(GenericAvroSchema.of(SchemaInfo.builder()

Review comment:
       I don't understand why do you need to wrap this using `AUTO_PRODUCE_BYTES`. `AUTO_PRODUCE_BYTES` is an expensive implementation because it has to deserialize to verify schema. 
   
   You can implement a special schema to deal with BytesWithSchema. https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/schema/KafkaAvroSchema.java
   
   

##########
File path: pulsar-io/kafka/src/main/resources/findbugsExclude.xml
##########
@@ -19,4 +19,8 @@
 
 -->
 <FindBugsFilter>
-</FindBugsFilter>
\ No newline at end of file
+    <Match>
+        <Class name="org.apache.pulsar.io.kafka.BytesWithSchema" />
+        <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2" />

Review comment:
       Use ByteBuffer then you can avoid this.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -116,19 +117,21 @@ public void close() throws InterruptedException {
         LOG.info("Kafka source stopped.");
     }
 
+    @SuppressWarnings("unchecked")
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,
+                            extractKey(consumerRecord),

Review comment:
       any reason why do you extract the key right now? Why can't you extract the key lazily when it was accessed via KafkaRecord.getKey?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r589225634



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -116,19 +117,21 @@ public void close() throws InterruptedException {
         LOG.info("Kafka source stopped.");
     }
 
+    @SuppressWarnings("unchecked")
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,
+                            extractKey(consumerRecord),

Review comment:
       I added this level of indirection in order to make extractKey like extractValue and extractSchema, this way the subclass will be able to add its own implementation.
   That would be in the follow up work about supporting non string keys.
   
   Not a big deal, I am removing this method now.

##########
File path: pulsar-io/kafka/src/main/resources/findbugsExclude.xml
##########
@@ -19,4 +19,8 @@
 
 -->
 <FindBugsFilter>
-</FindBugsFilter>
\ No newline at end of file
+    <Match>
+        <Class name="org.apache.pulsar.io.kafka.BytesWithSchema" />
+        <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2" />

Review comment:
       I have switched to ByteBuffer

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();

Review comment:
       done

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -41,16 +49,80 @@
 @Slf4j
 public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
 
+    private AvroSchemaCache schemaCache;
+
+    private static final Collection<String> SUPPORTED_KEY_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(StringDeserializer.class.getName()));
+
+    private static final Collection<String> SUPPORTED_VALUE_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(ByteArrayDeserializer.class.getName(), KafkaAvroDeserializer.class.getName()));
+
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         log.info("Created kafka consumer config : {}", props);
+
+        String currentKeyDeserializer = props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_KEY_DESERIALIZERS.contains(currentKeyDeserializer)) {
+            throw new IllegalArgumentException("Unsupported key deserializer: " + currentKeyDeserializer + ", only " + SUPPORTED_KEY_DESERIALIZERS);
+        }
+
+        String currentValueDeserializer = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_VALUE_DESERIALIZERS.contains(currentValueDeserializer)) {
+            throw new IllegalArgumentException("Unsupported value deserializer: " + currentValueDeserializer + ", only " + SUPPORTED_VALUE_DESERIALIZERS);
+        }
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValueDeserializer != null && currentValueDeserializer.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SchemaExtractorDeserializer.class.getName());
+            KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
+            List<String> urls = config.getSchemaRegistryUrls();
+            int maxSchemaObject = config.getMaxSchemasPerSubject();
+            SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
+            schemaCache = new AvroSchemaCache(schemaRegistryClient);
+        }
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return ((BytesWithSchema) value).getValue();
+        }
+        return value;
     }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<byte[]> extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return schemaCache.get(((BytesWithSchema) value).getSchemaId());
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class SchemaExtractorDeserializer implements Deserializer<BytesWithSchema> {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r589221356



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<byte[]> get(int schemaId) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just do not want this map to grow
+            // without limits
+            cache.clear();

Review comment:
       done
   
   The reason was that I didn't want to add an additional third party dependency. (the Guava jar will be bundled into the .nar file).

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+@Slf4j
+class AvroSchemaCache {
+    private ConcurrentHashMap<Integer, Schema<byte[]>> cache = new ConcurrentHashMap<>();
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<byte[]> get(int schemaId) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just do not want this map to grow
+            // without limits
+            cache.clear();
+        }
+        return cache.computeIfAbsent(schemaId, id -> {
+            try {
+                org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
+                String definition = schema.toString(false);
+                log.info("Schema {} definition {}", schemaId, definition);
+                return Schema.AUTO_PRODUCE_BYTES(GenericAvroSchema.of(SchemaInfo.builder()

Review comment:
       done

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -41,16 +49,80 @@
 @Slf4j
 public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
 
+    private AvroSchemaCache schemaCache;
+
+    private static final Collection<String> SUPPORTED_KEY_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(StringDeserializer.class.getName()));
+
+    private static final Collection<String> SUPPORTED_VALUE_DESERIALIZERS =
+            Collections.unmodifiableCollection(Arrays.asList(ByteArrayDeserializer.class.getName(), KafkaAvroDeserializer.class.getName()));
+
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         log.info("Created kafka consumer config : {}", props);
+
+        String currentKeyDeserializer = props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_KEY_DESERIALIZERS.contains(currentKeyDeserializer)) {
+            throw new IllegalArgumentException("Unsupported key deserializer: " + currentKeyDeserializer + ", only " + SUPPORTED_KEY_DESERIALIZERS);
+        }
+
+        String currentValueDeserializer = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        if (!SUPPORTED_VALUE_DESERIALIZERS.contains(currentValueDeserializer)) {
+            throw new IllegalArgumentException("Unsupported value deserializer: " + currentValueDeserializer + ", only " + SUPPORTED_VALUE_DESERIALIZERS);
+        }
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValueDeserializer != null && currentValueDeserializer.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SchemaExtractorDeserializer.class.getName());
+            KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
+            List<String> urls = config.getSchemaRegistryUrls();
+            int maxSchemaObject = config.getMaxSchemasPerSubject();
+            SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
+            schemaCache = new AvroSchemaCache(schemaRegistryClient);
+        }
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return ((BytesWithSchema) value).getValue();
+        }
+        return value;
     }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<byte[]> extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithSchema) {
+            return schemaCache.get(((BytesWithSchema) value).getSchemaId());
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class SchemaExtractorDeserializer implements Deserializer<BytesWithSchema> {

Review comment:
       done

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithSchema.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.Value;
+
+/**
+ * This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
+ */
+@Value
+public class BytesWithSchema {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586197528



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       @lhotari if you feel strong I will remove the generic type, I tried to remove the template parameter, but the code looks uglier to me.
   
   I added @SuppressWarnings("unchecked")




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r574312186



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.google.gson.Gson;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+
+import static org.testng.Assert.*;
+
+/**
+ * A tester for testing kafka source with Avro Messages.
+ * This test starts a PulsarCluster, a container with a Kafka Broker
+ * and a container with the SchemaRegistry.
+ * It populates a Kafka topic with Avro encoded messages with schema
+ * and then it verifies that the records are correclty received
+ * but a Pulsar Consumer
+ */
+@Slf4j
+public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
+
+    private static final String SOURCE_TYPE = "kafka";
+
+    final Duration ONE_MINUTE = Duration.ofMinutes(1);
+    final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+    final RetryPolicy statusRetryPolicy = new RetryPolicy()
+            .withMaxDuration(ONE_MINUTE)
+            .withDelay(TEN_SECONDS)
+            .onRetry(e -> log.error("Retry ... "));
+
+    private final String kafkaTopicName = "kafkasourcetopic";
+
+    private EnhancedKafkaContainer kafkaContainer;
+    private SchemaRegistryContainer schemaRegistryContainer;
+
+    protected final Map<String, Object> sourceConfig;
+    protected final String kafkaContainerName = "kafkacontainer";
+    protected final String schemaRegistryContainerName = "schemaregistry";
+
+    public AvroKafkaSourceTest() {
+        sourceConfig = new HashMap<>();
+    }
+
+    @Test(groups = "source")
+    public void test() throws Exception {
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        startKafkaContainers(pulsarCluster);
+        try {
+            testSource();
+        } finally {
+            stopKafkaContainers(pulsarCluster);
+        }
+    }
+
+    private String getBootstrapServersOnDockerNetwork() {
+        return kafkaContainerName + ":9093";
+    }
+
+
+    public void startKafkaContainers(PulsarCluster cluster) throws Exception {
+        this.kafkaContainer = createKafkaContainer(cluster);
+        cluster.startService(kafkaContainerName, kafkaContainer);
+        log.info("creating schema registry kafka {}",  getBootstrapServersOnDockerNetwork());
+        this.schemaRegistryContainer = new SchemaRegistryContainer(getBootstrapServersOnDockerNetwork());
+        cluster.startService(schemaRegistryContainerName, schemaRegistryContainer);
+        sourceConfig.put("bootstrapServers", getBootstrapServersOnDockerNetwork());
+        sourceConfig.put("groupId", "test-source-group");
+        sourceConfig.put("fetchMinBytes", 1L);
+        sourceConfig.put("autoCommitIntervalMs", 10L);
+        sourceConfig.put("sessionTimeoutMs", 10000L);
+        sourceConfig.put("heartbeatIntervalMs", 5000L);
+        sourceConfig.put("topic", kafkaTopicName);
+        sourceConfig.put("consumerConfigProperties",
+                ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork())
+        );
+    }
+
+    private class EnhancedKafkaContainer extends KafkaContainer {
+
+        public EnhancedKafkaContainer(DockerImageName dockerImageName) {
+            super(dockerImageName);
+        }
+
+        @Override
+        public String getBootstrapServers() {
+            // we have to override this function
+            // because we want the Kafka Broker to advertise itself
+            // with the docker network address
+            // otherwise the Kafka Schema Registry won't work
+            return "PLAINTEXT://" + kafkaContainerName + ":9093";
+        }
+
+    }
+
+    protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster cluster) {
+        return (EnhancedKafkaContainer) new EnhancedKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.1"))
+                .withEmbeddedZookeeper()
+                .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
+                        .withName(kafkaContainerName)
+                );
+    }
+
+    public void stopKafkaContainers(PulsarCluster cluster) {
+        if (null != schemaRegistryContainer) {
+            cluster.stopService(schemaRegistryContainerName, schemaRegistryContainer);
+        }
+        if (null != kafkaContainer) {
+            cluster.stopService(kafkaContainerName, kafkaContainer);
+        }
+    }
+
+    public void prepareSource() throws Exception {
+        log.info("creating topic");
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+                getZooKeeperAddressInDockerNetwork(),
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+    }
+
+    private String getZooKeeperAddressInDockerNetwork() {
+        return kafkaContainerName +":2181";
+    }
+
+    private <T extends GenericContainer> void testSource()  throws Exception {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r570979416



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -52,4 +67,29 @@ public GenericAvroWriter(Schema schema) {
             this.byteArrayOutputStream.reset();
         }
     }
+
+    /**
+     * This is an adapter from Pulsar GenericRecord to Avro classes.
+     */
+    private class GenericRecordAdapter extends SpecificRecordBase {

Review comment:
       @lhotari I need to access the `schema` field.
   making it static will add a new field, btw we already have an implicit field referring the container object, so not a big deal, do you have any preference ?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+class PulsarSchemaCache<T> {
+
+    @Data
+    @AllArgsConstructor
+    public static final class CachedSchema<T> {
+        private final Schema<T> schema;
+        private final List<Field> fields;
+    }
+
+    private IdentityHashMap<org.apache.avro.Schema, CachedSchema<T>> cache = new IdentityHashMap<>();
+
+    public synchronized CachedSchema<T> get(org.apache.avro.Schema avroSchema) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just want this map to grow

Review comment:
       I  have no other point where to activate this clean up.
   
   IMHO it is not worth to add some other background thread to perform this clean up.
   

##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.google.gson.Gson;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+
+import static org.testng.Assert.*;
+
+/**
+ * A tester for testing kafka source with Avro Messages.
+ * This test starts a PulsarCluster, a container with a Kafka Broker
+ * and a container with the SchemaRegistry.
+ * It populates a Kafka topic with Avro encoded messages with schema
+ * and then it verifies that the records are correclty received
+ * but a Pulsar Consumer
+ */
+@Slf4j
+public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
+
+    private static final String SOURCE_TYPE = "kafka";
+
+    final Duration ONE_MINUTE = Duration.ofMinutes(1);
+    final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+    final RetryPolicy statusRetryPolicy = new RetryPolicy()
+            .withMaxDuration(ONE_MINUTE)
+            .withDelay(TEN_SECONDS)
+            .onRetry(e -> log.error("Retry ... "));
+
+    private final String kafkaTopicName = "kafkasourcetopic";
+
+    private EnhancedKafkaContainer kafkaContainer;
+    private SchemaRegistryContainer schemaRegistryContainer;
+
+    protected final Map<String, Object> sourceConfig;
+    protected final String kafkaContainerName = "kafkacontainer";
+    protected final String schemaRegistryContainerName = "schemaregistry";
+
+    public AvroKafkaSourceTest() {
+        sourceConfig = new HashMap<>();
+    }
+
+    @Test(groups = "source")
+    public void test() throws Exception {
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        startKafkaContainers(pulsarCluster);
+        try {
+            testSource();
+        } finally {
+            stopKafkaContainers(pulsarCluster);
+        }
+    }
+
+    private String getBootstrapServersOnDockerNetwork() {
+        return kafkaContainerName + ":9093";
+    }
+
+
+    public void startKafkaContainers(PulsarCluster cluster) throws Exception {
+        this.kafkaContainer = createKafkaContainer(cluster);
+        cluster.startService(kafkaContainerName, kafkaContainer);
+        log.info("creating schema registry kafka {}",  getBootstrapServersOnDockerNetwork());
+        this.schemaRegistryContainer = new SchemaRegistryContainer(getBootstrapServersOnDockerNetwork());
+        cluster.startService(schemaRegistryContainerName, schemaRegistryContainer);
+        sourceConfig.put("bootstrapServers", getBootstrapServersOnDockerNetwork());
+        sourceConfig.put("groupId", "test-source-group");
+        sourceConfig.put("fetchMinBytes", 1L);
+        sourceConfig.put("autoCommitIntervalMs", 10L);
+        sourceConfig.put("sessionTimeoutMs", 10000L);
+        sourceConfig.put("heartbeatIntervalMs", 5000L);
+        sourceConfig.put("topic", kafkaTopicName);
+        sourceConfig.put("consumerConfigProperties",
+                ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork())
+        );
+    }
+
+    private class EnhancedKafkaContainer extends KafkaContainer {
+
+        public EnhancedKafkaContainer(DockerImageName dockerImageName) {
+            super(dockerImageName);
+        }
+
+        @Override
+        public String getBootstrapServers() {
+            // we have to override this function
+            // because we want the Kafka Broker to advertise itself
+            // with the docker network address
+            // otherwise the Kafka Schema Registry won't work
+            return "PLAINTEXT://" + kafkaContainerName + ":9093";
+        }
+
+    }
+
+    protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster cluster) {
+        return (EnhancedKafkaContainer) new EnhancedKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.1"))
+                .withEmbeddedZookeeper()
+                .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
+                        .withName(kafkaContainerName)
+                );
+    }
+
+    public void stopKafkaContainers(PulsarCluster cluster) {
+        if (null != schemaRegistryContainer) {
+            cluster.stopService(schemaRegistryContainerName, schemaRegistryContainer);
+        }
+        if (null != kafkaContainer) {
+            cluster.stopService(kafkaContainerName, kafkaContainer);
+        }
+    }
+
+    public void prepareSource() throws Exception {
+        log.info("creating topic");
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+                getZooKeeperAddressInDockerNetwork(),
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+    }
+
+    private String getZooKeeperAddressInDockerNetwork() {
+        return kafkaContainerName +":2181";
+    }
+
+    private <T extends GenericContainer> void testSource()  throws Exception {

Review comment:
       it is a left over. I will remove it, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r570985143



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -52,4 +67,29 @@ public GenericAvroWriter(Schema schema) {
             this.byteArrayOutputStream.reset();
         }
     }
+
+    /**
+     * This is an adapter from Pulsar GenericRecord to Avro classes.
+     */
+    private class GenericRecordAdapter extends SpecificRecordBase {

Review comment:
       static inner classes are usually preferred.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585053400



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? isKey : false);
+                    Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       Why not implement `Source<ByteBuf>` or `Source<ByteBuffer>`?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();

Review comment:
       The deserializer is constructed by reflection. There is a lot of unknowns on how Kafka would construct the deserializer. Hence I would suggest to keep the deserializer as simple as possible. What you need is just to get the schema id and the remaining content. See example at https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/serde/KafkaSchemaAndBytesDeserializer.java
   
   You don't need to pass the cache into the deserializer. You can implement the logic of extracting schema in `extractSchema` method. So the schema cache can be maintained in the connector, and then also you can get the schema information in the same pulsar thread rather than hiding the logic deeply into a deserializer where you don't know where the deserialize method will be run.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586175632



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithSchema.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+
+import org.apache.pulsar.client.api.schema.*;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
+ */
+@Data

Review comment:
       nit: well I was wondering why [lombok.Value](https://projectlombok.org/features/Value) isn't used since it's does exactly that: "please add the getters and the default constructor with all of the fields". 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r591195750



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -48,12 +49,36 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>

Review comment:
       sorry, why do we need this dependency? If we need this dependency, why it is provided?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586187210



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());

Review comment:
       Wouldn't it be cleaner to simply handle what is currently supported? One reason could be to address the compiler warnings related to unchecked or unsafe operations without suppressing the compiler warnings. If compiler warnings are ignored, it probably doesn't make a difference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-779133837


   @sijie thank you for your pointers.
   
   I managed to implement Avro support in `KafkaBytesSource`.
   
   I would like to commit this initial patch and then add support for all of the primitive Schemas (1) and for JSON.
   If you prefer I can continue to enhance this patch
   
   (1) https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/KafkaSource.java#L338


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r570214378



##########
File path: pom.xml
##########
@@ -152,6 +152,7 @@ flexible messaging model and an intuitive client API.</description>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.14.0</prometheus-jmx.version>
     <confluent.version>5.3.2</confluent.version>
+    <kafka.confluent.schemaregstryclient.version>3.3.1</kafka.confluent.schemaregstryclient.version>

Review comment:
       typo (regstry -> registry)

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -52,4 +67,29 @@ public GenericAvroWriter(Schema schema) {
             this.byteArrayOutputStream.reset();
         }
     }
+
+    /**
+     * This is an adapter from Pulsar GenericRecord to Avro classes.
+     */
+    private class GenericRecordAdapter extends SpecificRecordBase {

Review comment:
       shouldn't this be a `private static class` ?

##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.google.gson.Gson;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+
+import static org.testng.Assert.*;
+
+/**
+ * A tester for testing kafka source with Avro Messages.
+ * This test starts a PulsarCluster, a container with a Kafka Broker
+ * and a container with the SchemaRegistry.
+ * It populates a Kafka topic with Avro encoded messages with schema
+ * and then it verifies that the records are correclty received
+ * but a Pulsar Consumer
+ */
+@Slf4j
+public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
+
+    private static final String SOURCE_TYPE = "kafka";
+
+    final Duration ONE_MINUTE = Duration.ofMinutes(1);
+    final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+    final RetryPolicy statusRetryPolicy = new RetryPolicy()
+            .withMaxDuration(ONE_MINUTE)
+            .withDelay(TEN_SECONDS)
+            .onRetry(e -> log.error("Retry ... "));
+
+    private final String kafkaTopicName = "kafkasourcetopic";
+
+    private EnhancedKafkaContainer kafkaContainer;
+    private SchemaRegistryContainer schemaRegistryContainer;
+
+    protected final Map<String, Object> sourceConfig;
+    protected final String kafkaContainerName = "kafkacontainer";
+    protected final String schemaRegistryContainerName = "schemaregistry";
+
+    public AvroKafkaSourceTest() {
+        sourceConfig = new HashMap<>();
+    }
+
+    @Test(groups = "source")
+    public void test() throws Exception {
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        startKafkaContainers(pulsarCluster);
+        try {
+            testSource();
+        } finally {
+            stopKafkaContainers(pulsarCluster);
+        }
+    }
+
+    private String getBootstrapServersOnDockerNetwork() {
+        return kafkaContainerName + ":9093";
+    }
+
+
+    public void startKafkaContainers(PulsarCluster cluster) throws Exception {
+        this.kafkaContainer = createKafkaContainer(cluster);
+        cluster.startService(kafkaContainerName, kafkaContainer);
+        log.info("creating schema registry kafka {}",  getBootstrapServersOnDockerNetwork());
+        this.schemaRegistryContainer = new SchemaRegistryContainer(getBootstrapServersOnDockerNetwork());
+        cluster.startService(schemaRegistryContainerName, schemaRegistryContainer);
+        sourceConfig.put("bootstrapServers", getBootstrapServersOnDockerNetwork());
+        sourceConfig.put("groupId", "test-source-group");
+        sourceConfig.put("fetchMinBytes", 1L);
+        sourceConfig.put("autoCommitIntervalMs", 10L);
+        sourceConfig.put("sessionTimeoutMs", 10000L);
+        sourceConfig.put("heartbeatIntervalMs", 5000L);
+        sourceConfig.put("topic", kafkaTopicName);
+        sourceConfig.put("consumerConfigProperties",
+                ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork())
+        );
+    }
+
+    private class EnhancedKafkaContainer extends KafkaContainer {
+
+        public EnhancedKafkaContainer(DockerImageName dockerImageName) {
+            super(dockerImageName);
+        }
+
+        @Override
+        public String getBootstrapServers() {
+            // we have to override this function
+            // because we want the Kafka Broker to advertise itself
+            // with the docker network address
+            // otherwise the Kafka Schema Registry won't work
+            return "PLAINTEXT://" + kafkaContainerName + ":9093";
+        }
+
+    }
+
+    protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster cluster) {
+        return (EnhancedKafkaContainer) new EnhancedKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.1"))
+                .withEmbeddedZookeeper()
+                .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
+                        .withName(kafkaContainerName)
+                );
+    }
+
+    public void stopKafkaContainers(PulsarCluster cluster) {
+        if (null != schemaRegistryContainer) {
+            cluster.stopService(schemaRegistryContainerName, schemaRegistryContainer);
+        }
+        if (null != kafkaContainer) {
+            cluster.stopService(kafkaContainerName, kafkaContainer);
+        }
+    }
+
+    public void prepareSource() throws Exception {
+        log.info("creating topic");
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+                getZooKeeperAddressInDockerNetwork(),
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+    }
+
+    private String getZooKeeperAddressInDockerNetwork() {
+        return kafkaContainerName +":2181";
+    }
+
+    private <T extends GenericContainer> void testSource()  throws Exception {

Review comment:
       it the type parameter (`<T extends GenericContainer>`) required? 

##########
File path: pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
##########
@@ -170,8 +171,30 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
-            Schema schema = pulsarSink.initializeSchema();
+            PulsarSink.InitSchemaResult initSchemaResult = pulsarSink.initializeSchema();
+            Schema schema = initSchemaResult.schema;
             assertNull(schema);
+            // void type do not require the sink runtime to be created
+            assertTrue(!initSchemaResult.requireSink);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertNull(ex);
+            fail();

Review comment:
       does TestNG support passing the exception as the argument to fail? Could `ex.printStackTrace()` be omitted in that case?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+class PulsarSchemaCache<T> {
+
+    @Data
+    @AllArgsConstructor
+    public static final class CachedSchema<T> {
+        private final Schema<T> schema;
+        private final List<Field> fields;
+    }
+
+    private IdentityHashMap<org.apache.avro.Schema, CachedSchema<T>> cache = new IdentityHashMap<>();

Review comment:
       Why not use ConcurrentHashMap so that the `get` method wouldn't have to use `synchronized`?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +117,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
+            ConsumerRecords<String, KV> consumerRecords;
             while (running) {
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
+                for (ConsumerRecord<String, KV> consumerRecord : consumerRecords) {
+                    LOG.info("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());

Review comment:
       for performance reasonsin log statements, use debug level and wrap with `if (LOG.isDebugEnabled()) { ... }`

##########
File path: pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
##########
@@ -170,8 +171,30 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
-            Schema schema = pulsarSink.initializeSchema();
+            PulsarSink.InitSchemaResult initSchemaResult = pulsarSink.initializeSchema();
+            Schema schema = initSchemaResult.schema;
             assertNull(schema);
+            // void type do not require the sink runtime to be created
+            assertTrue(!initSchemaResult.requireSink);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertNull(ex);

Review comment:
       what is the meaning of `assertNull(ex)`?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+class PulsarSchemaCache<T> {
+
+    @Data
+    @AllArgsConstructor
+    public static final class CachedSchema<T> {
+        private final Schema<T> schema;
+        private final List<Field> fields;
+    }
+
+    private IdentityHashMap<org.apache.avro.Schema, CachedSchema<T>> cache = new IdentityHashMap<>();
+
+    public synchronized CachedSchema<T> get(org.apache.avro.Schema avroSchema) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just want this map to grow

Review comment:
       why does the cleanup happen at lookup time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585448061



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();

Review comment:
       done

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+
+@Slf4j
+class AvroSchemaCache<T> {
+    // we are using the Object identity as key of the cache
+    // we do not want to perform costly operations in order to lookup into the cache
+    private IdentityHashMap<org.apache.avro.Schema, Schema<T>> cache = new IdentityHashMap<>();
+
+    public synchronized Schema<T> get(org.apache.avro.Schema avroSchema) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just want this map to grow

Review comment:
       thanks @michaeljmarshall .
   I have fixed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r589785042



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+final class AvroSchemaCache {
+    private final LoadingCache<Integer, Schema<ByteBuffer>> cache = CacheBuilder
+            .newBuilder()
+            .maximumSize(100)
+            .build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
+                @Override
+                public Schema<ByteBuffer> load(Integer schemaId) throws Exception {
+                    return fetchSchema(schemaId);
+                }
+            });
+
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<ByteBuffer> get(int schemaId) {
+        try {
+            return cache.get(schemaId);
+        } catch (ExecutionException err) {
+            throw new RuntimeException(err.getCause());
+        }
+    }
+
+    private Schema<ByteBuffer> fetchSchema(int schemaId) {
+        try {
+            org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
+            String definition = schema.toString(false);
+            log.info("Schema {} definition {}", schemaId, definition);
+            SchemaInfo schemaInfo = SchemaInfo.builder()
+                    .type(SchemaType.AVRO)
+                    .name(schema.getName())
+                    .properties(Collections.emptyMap())
+                    .schema(definition.getBytes(StandardCharsets.UTF_8)
+                    ).build();
+            return new Schema<ByteBuffer>() {

Review comment:
       I think you need to override the decode method and throw an UnsupportedException. Otherwise, if the decode method is used, it will cause StackOverflowException.

##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -48,12 +49,42 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>

Review comment:
       Why do you need this dependency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585639918



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       @lhotari 
   adding the parameter helps a little bit inside the KafkaRecord in order to check at compile time a few relations between the moving parts.
   Here I cannot use the parameter because actually I do not know the final binding, as it depends on the configuration of the Consumer 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-792241831


   /pulsarbot rerun-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-799194904


   @thank you very much @codelipenghui  and @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-799133976


   @codelipenghui can you please help merging this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-788852790


   @sijie 
   
   > Why not implement Source<ByteBuf> or Source<ByteBuffer>?
   I am not sure we can save a memory copy this way.
   
   In the end we have to return a byte[] during `Schema#encode`
   https://github.com/apache/pulsar/blob/c01b1eeda3221bdbf863bf0f3f8373e93d90adef/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java#L72
   
   The ByteBuffer we could return is pointing to a non-zero offset on the original array (we are skipping the magic number and the schema id), so we cannot re-use the same array and we will end-up in perform a copy.
   https://github.com/apache/pulsar/blob/cd259a443625dfa0710e79a87de65fd11b85d5bc/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java#L58
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773312740


   @sijie 
   This is the first part of the patch
   https://github.com/apache/pulsar/pull/9481


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r591254811



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -48,12 +49,36 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>

Review comment:
       @sijie good catch.
   I don't know why I add that stuff.
   probably it is a leftover of the initial work.
   
   I have removed this section and I am able to build and run the tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585525313



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());
+    }
+
+    public abstract Schema<V> extractSchema(ConsumerRecord<Object, Object> consumerRecord);
+
+    @Slf4j
     static private class KafkaRecord<V> implements Record<V> {

Review comment:
       (see previous comment about possibly removing the  type parameter from KafkaRecord class if it's always Object)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-785692571


   @merlimat @rdhabalia @sijie 
   
   please help me moving forward on this topic


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-783153997


   @sijie ping
   PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-793507919


   @sijie I have addressed your last comments.
   I created a new issue regarding the management of the key.
   https://github.com/apache/pulsar/issues/9848
   
   this way we can move forward one step at a time


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-776504126


   @sijie what about the current version of this patch ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r590038785



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+final class AvroSchemaCache {
+    private final LoadingCache<Integer, Schema<ByteBuffer>> cache = CacheBuilder
+            .newBuilder()
+            .maximumSize(100)
+            .build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
+                @Override
+                public Schema<ByteBuffer> load(Integer schemaId) throws Exception {
+                    return fetchSchema(schemaId);
+                }
+            });
+
+    private final SchemaRegistryClient schemaRegistryClient;
+
+    public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+        this.schemaRegistryClient = schemaRegistryClient;
+    }
+
+    public Schema<ByteBuffer> get(int schemaId) {
+        try {
+            return cache.get(schemaId);
+        } catch (ExecutionException err) {
+            throw new RuntimeException(err.getCause());
+        }
+    }
+
+    private Schema<ByteBuffer> fetchSchema(int schemaId) {
+        try {
+            org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
+            String definition = schema.toString(false);
+            log.info("Schema {} definition {}", schemaId, definition);
+            SchemaInfo schemaInfo = SchemaInfo.builder()
+                    .type(SchemaType.AVRO)
+                    .name(schema.getName())
+                    .properties(Collections.emptyMap())
+                    .schema(definition.getBytes(StandardCharsets.UTF_8)
+                    ).build();
+            return new Schema<ByteBuffer>() {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773143619






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585520586



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -54,6 +54,31 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>

Review comment:
       Is the pulsar-client-original dependency needed in the compile scope? the problem is that it will pull a lot of dependencies to the .nar files that are built. Perhaps `provided` scope would be fine? However I think that `provided` scope isn't transient and therefore it might not have the desired impact in this library.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r569527016



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,17 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {

Review comment:
       changes to ProducerImpl are part of https://github.com/apache/pulsar/pull/9396
   I will revert them as soon as #9396 is committed

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3854,4 +3857,49 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
             Assert.assertEquals(size, 0);
         });
     }
+
+
+    @Data
+    @EqualsAndHashCode
+    public static class MyBean {
+        private String field;
+    }
+
+    @DataProvider(name = "enableBatching")
+    public static Object[] isEnableBatching() {
+        return new Object[]{false, true};
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {

Review comment:
       this is part of https://github.com/apache/pulsar/pull/9396

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -18,32 +18,47 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
 
 import java.io.ByteArrayOutputStream;
-
+@Slf4j
 public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
 
     private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
+    private final Schema schema;
+    private final GenericRecordAdapter adapter;
 
     public GenericAvroWriter(Schema schema) {
+        this.schema = schema;
         this.writer = new GenericDatumWriter<>(schema);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+        this.adapter = new GenericRecordAdapter();
     }
 
     @Override
     public synchronized byte[] write(GenericRecord message) {
         try {
-            writer.write(((GenericAvroRecord)message).getAvroRecord(), this.encoder);
+            if (message instanceof GenericAvroRecord) {
+                writer.write(((GenericAvroRecord) message).getAvroRecord(), this.encoder);
+            } else {
+                adapter.setCurrentMessage(message);

Review comment:
       @sijie here we are not extending the Pulsar Schema API, but simply we are allowing this method to work with any implementation of GenericRecord.

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -385,28 +388,43 @@ public void close() throws Exception {
         }
     }
 
+    @AllArgsConstructor
+    static class InitSchemaResult<T> {
+        final Schema<T> schema;
+        final boolean requireSink;
+    }
+
     @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Schema<T> initializeSchema() throws ClassNotFoundException {
+    InitSchemaResult<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            return (Schema<T>) Schema.BYTES;
+            return new InitSchemaResult((Schema<T>) Schema.BYTES, true);
         }
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return null;
+            log.info("typeClassName is {}, no need to force a schema", this.pulsarSinkConfig.getTypeClassName());
+            return new InitSchemaResult(null, false);
+        }
+        if (GenericRecord.class.equals(typeArg)) {

Review comment:
       with this change we can allow Pulsar Sources to produce GenericRecord




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r569527016



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,17 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {

Review comment:
       changes to ProducerImpl are part of https://github.com/apache/pulsar/pull/9396
   I will revert them as soon as #9396 is committed

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3854,4 +3857,49 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
             Assert.assertEquals(size, 0);
         });
     }
+
+
+    @Data
+    @EqualsAndHashCode
+    public static class MyBean {
+        private String field;
+    }
+
+    @DataProvider(name = "enableBatching")
+    public static Object[] isEnableBatching() {
+        return new Object[]{false, true};
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {

Review comment:
       this is part of https://github.com/apache/pulsar/pull/9396

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -18,32 +18,47 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
 
 import java.io.ByteArrayOutputStream;
-
+@Slf4j
 public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
 
     private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
+    private final Schema schema;
+    private final GenericRecordAdapter adapter;
 
     public GenericAvroWriter(Schema schema) {
+        this.schema = schema;
         this.writer = new GenericDatumWriter<>(schema);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+        this.adapter = new GenericRecordAdapter();
     }
 
     @Override
     public synchronized byte[] write(GenericRecord message) {
         try {
-            writer.write(((GenericAvroRecord)message).getAvroRecord(), this.encoder);
+            if (message instanceof GenericAvroRecord) {
+                writer.write(((GenericAvroRecord) message).getAvroRecord(), this.encoder);
+            } else {
+                adapter.setCurrentMessage(message);

Review comment:
       @sijie here we are not extending the Pulsar Schema API, but simply we are allowing this method to work with any implementation of GenericRecord.

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -385,28 +388,43 @@ public void close() throws Exception {
         }
     }
 
+    @AllArgsConstructor
+    static class InitSchemaResult<T> {
+        final Schema<T> schema;
+        final boolean requireSink;
+    }
+
     @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Schema<T> initializeSchema() throws ClassNotFoundException {
+    InitSchemaResult<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            return (Schema<T>) Schema.BYTES;
+            return new InitSchemaResult((Schema<T>) Schema.BYTES, true);
         }
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return null;
+            log.info("typeClassName is {}, no need to force a schema", this.pulsarSinkConfig.getTypeClassName());
+            return new InitSchemaResult(null, false);
+        }
+        if (GenericRecord.class.equals(typeArg)) {

Review comment:
       with this change we can allow Pulsar Sources to produce GenericRecord




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585672197



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -54,6 +54,31 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>

Review comment:
       I have moved this to provided, good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585638214



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithSchema.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+
+import org.apache.pulsar.client.api.schema.*;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
+ */
+@Data

Review comment:
       @lhotari 
   It is a shortcut here for "please add the getters and the default constructor with all of the fields"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585522257



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithSchema.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+
+import org.apache.pulsar.client.api.schema.*;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
+ */
+@Data

Review comment:
       `lombok.Value` is commonly used for immutable classes. I guess it doesn't make a difference in this case since fields are explicitly set to final.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585524925



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());

Review comment:
       since the key is always a String, would it make sense to set that already in the ConsumerRecord type parameter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-795163968


   /pulsarbot rerun-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-797856876


   @freeznet @codelipenghui Can you review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-793669955


   @codelipenghui @sijie CI passed. probably we are good to go now :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-773321616


   Thank you @lhotari 
   Let's follow up in the linked pr.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585523971



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       it seems that the type parameter isn't set. Would it make sense to make it KafkaRecord<Object> or drop the type parameter from the private KafkaRecord inner class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-775008149


   @sijie I have addressed your main concern about deserialisation, now we are passing the whole payload (excerpt from the Kafka Schema Registry header).
   
   Important issues to be fixed a follow up:
   - handle keys
   - handle JSON
   - have better packaging of NARs that contain multiple sources #3678
   
   I will be happy to continue the work on this Source after we reach consensus over this patch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-785697728


   Apologized for the delay! will review it today or tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r577402048



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? isKey : false);
+                    Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       The `schemaRegistry` has its own cache (it should be a  `CachedSchemaRegistryClient`)
   
   This version of the code is a simplified version of the original `deserialize` method.
   I wanted to keep it as close as possible to the original implementation.
   
   ```
   protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
           if (payload == null) {
               return null;
           } else {
               byte id = -1;
   
               try {
                   ByteBuffer buffer = this.getByteBuffer(payload);
                   int id = buffer.getInt();
                   String subject = includeSchemaAndVersion ? getSubjectName(topic, isKey) : null;
                   Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);
                   int length = buffer.limit() - 1 - 4;
                   Object result;
   ```

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+

Review comment:
       sure. I added a validation

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? isKey : false);
+                    Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       In the end we have to pass a byte[] to the PulsarProducer as value for the Record (we are implementing a `Source<byte[]>`, so we have to copy this data at least once.
   
   There is no way to save this memory copy, because we have to skip the magic number and the schema id. 
   

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+
+@Slf4j
+class PulsarSchemaCache<T> {

Review comment:
       Updated

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {

Review comment:
       makes sense.
   I am adding a new `extractKey` method in order to make it clearer

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");

Review comment:
       removed (it was just an example value, useful only for local testing)

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();

Review comment:
       unfortunately I don't have a way to pass the PulsarAvroSchemaCache instance to the deserializer (because we do not have control over the lifecycle).
   
   Also we also need one instance and its lifecycle is tied to the lifecycle of the deserializer, so IMHO it can live internally to the deserializer.
   
   Actually this deserializer produces `BytesWithAvroPulsarSchema` instances. I believe it is fine to leave all of the schema management stuff into it. 
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r570214378



##########
File path: pom.xml
##########
@@ -152,6 +152,7 @@ flexible messaging model and an intuitive client API.</description>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.14.0</prometheus-jmx.version>
     <confluent.version>5.3.2</confluent.version>
+    <kafka.confluent.schemaregstryclient.version>3.3.1</kafka.confluent.schemaregstryclient.version>

Review comment:
       typo (regstry -> registry)

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -52,4 +67,29 @@ public GenericAvroWriter(Schema schema) {
             this.byteArrayOutputStream.reset();
         }
     }
+
+    /**
+     * This is an adapter from Pulsar GenericRecord to Avro classes.
+     */
+    private class GenericRecordAdapter extends SpecificRecordBase {

Review comment:
       shouldn't this be a `private static class` ?

##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.google.gson.Gson;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+
+import static org.testng.Assert.*;
+
+/**
+ * A tester for testing kafka source with Avro Messages.
+ * This test starts a PulsarCluster, a container with a Kafka Broker
+ * and a container with the SchemaRegistry.
+ * It populates a Kafka topic with Avro encoded messages with schema
+ * and then it verifies that the records are correclty received
+ * but a Pulsar Consumer
+ */
+@Slf4j
+public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
+
+    private static final String SOURCE_TYPE = "kafka";
+
+    final Duration ONE_MINUTE = Duration.ofMinutes(1);
+    final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+    final RetryPolicy statusRetryPolicy = new RetryPolicy()
+            .withMaxDuration(ONE_MINUTE)
+            .withDelay(TEN_SECONDS)
+            .onRetry(e -> log.error("Retry ... "));
+
+    private final String kafkaTopicName = "kafkasourcetopic";
+
+    private EnhancedKafkaContainer kafkaContainer;
+    private SchemaRegistryContainer schemaRegistryContainer;
+
+    protected final Map<String, Object> sourceConfig;
+    protected final String kafkaContainerName = "kafkacontainer";
+    protected final String schemaRegistryContainerName = "schemaregistry";
+
+    public AvroKafkaSourceTest() {
+        sourceConfig = new HashMap<>();
+    }
+
+    @Test(groups = "source")
+    public void test() throws Exception {
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        startKafkaContainers(pulsarCluster);
+        try {
+            testSource();
+        } finally {
+            stopKafkaContainers(pulsarCluster);
+        }
+    }
+
+    private String getBootstrapServersOnDockerNetwork() {
+        return kafkaContainerName + ":9093";
+    }
+
+
+    public void startKafkaContainers(PulsarCluster cluster) throws Exception {
+        this.kafkaContainer = createKafkaContainer(cluster);
+        cluster.startService(kafkaContainerName, kafkaContainer);
+        log.info("creating schema registry kafka {}",  getBootstrapServersOnDockerNetwork());
+        this.schemaRegistryContainer = new SchemaRegistryContainer(getBootstrapServersOnDockerNetwork());
+        cluster.startService(schemaRegistryContainerName, schemaRegistryContainer);
+        sourceConfig.put("bootstrapServers", getBootstrapServersOnDockerNetwork());
+        sourceConfig.put("groupId", "test-source-group");
+        sourceConfig.put("fetchMinBytes", 1L);
+        sourceConfig.put("autoCommitIntervalMs", 10L);
+        sourceConfig.put("sessionTimeoutMs", 10000L);
+        sourceConfig.put("heartbeatIntervalMs", 5000L);
+        sourceConfig.put("topic", kafkaTopicName);
+        sourceConfig.put("consumerConfigProperties",
+                ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork())
+        );
+    }
+
+    private class EnhancedKafkaContainer extends KafkaContainer {
+
+        public EnhancedKafkaContainer(DockerImageName dockerImageName) {
+            super(dockerImageName);
+        }
+
+        @Override
+        public String getBootstrapServers() {
+            // we have to override this function
+            // because we want the Kafka Broker to advertise itself
+            // with the docker network address
+            // otherwise the Kafka Schema Registry won't work
+            return "PLAINTEXT://" + kafkaContainerName + ":9093";
+        }
+
+    }
+
+    protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster cluster) {
+        return (EnhancedKafkaContainer) new EnhancedKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.1"))
+                .withEmbeddedZookeeper()
+                .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
+                        .withName(kafkaContainerName)
+                );
+    }
+
+    public void stopKafkaContainers(PulsarCluster cluster) {
+        if (null != schemaRegistryContainer) {
+            cluster.stopService(schemaRegistryContainerName, schemaRegistryContainer);
+        }
+        if (null != kafkaContainer) {
+            cluster.stopService(kafkaContainerName, kafkaContainer);
+        }
+    }
+
+    public void prepareSource() throws Exception {
+        log.info("creating topic");
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+                getZooKeeperAddressInDockerNetwork(),
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+    }
+
+    private String getZooKeeperAddressInDockerNetwork() {
+        return kafkaContainerName +":2181";
+    }
+
+    private <T extends GenericContainer> void testSource()  throws Exception {

Review comment:
       it the type parameter (`<T extends GenericContainer>`) required? 

##########
File path: pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
##########
@@ -170,8 +171,30 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
-            Schema schema = pulsarSink.initializeSchema();
+            PulsarSink.InitSchemaResult initSchemaResult = pulsarSink.initializeSchema();
+            Schema schema = initSchemaResult.schema;
             assertNull(schema);
+            // void type do not require the sink runtime to be created
+            assertTrue(!initSchemaResult.requireSink);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertNull(ex);
+            fail();

Review comment:
       does TestNG support passing the exception as the argument to fail? Could `ex.printStackTrace()` be omitted in that case?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+class PulsarSchemaCache<T> {
+
+    @Data
+    @AllArgsConstructor
+    public static final class CachedSchema<T> {
+        private final Schema<T> schema;
+        private final List<Field> fields;
+    }
+
+    private IdentityHashMap<org.apache.avro.Schema, CachedSchema<T>> cache = new IdentityHashMap<>();

Review comment:
       Why not use ConcurrentHashMap so that the `get` method wouldn't have to use `synchronized`?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +117,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
             consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
+            ConsumerRecords<String, KV> consumerRecords;
             while (running) {
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
+                for (ConsumerRecord<String, KV> consumerRecord : consumerRecords) {
+                    LOG.info("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());

Review comment:
       for performance reasonsin log statements, use debug level and wrap with `if (LOG.isDebugEnabled()) { ... }`

##########
File path: pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
##########
@@ -170,8 +171,30 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
 
         try {
-            Schema schema = pulsarSink.initializeSchema();
+            PulsarSink.InitSchemaResult initSchemaResult = pulsarSink.initializeSchema();
+            Schema schema = initSchemaResult.schema;
             assertNull(schema);
+            // void type do not require the sink runtime to be created
+            assertTrue(!initSchemaResult.requireSink);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertNull(ex);

Review comment:
       what is the meaning of `assertNull(ex)`?

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+class PulsarSchemaCache<T> {
+
+    @Data
+    @AllArgsConstructor
+    public static final class CachedSchema<T> {
+        private final Schema<T> schema;
+        private final List<Field> fields;
+    }
+
+    private IdentityHashMap<org.apache.avro.Schema, CachedSchema<T>> cache = new IdentityHashMap<>();
+
+    public synchronized CachedSchema<T> get(org.apache.avro.Schema avroSchema) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just want this map to grow

Review comment:
       why does the cleanup happen at lookup time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585525313



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -151,18 +153,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+    public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+        return consumerRecord.value();
+    }
 
+    public Optional<String> extractKey(ConsumerRecord<Object, Object> consumerRecord) {
+        // we are currently supporting only String keys
+        return Optional.ofNullable((String) consumerRecord.key());
+    }
+
+    public abstract Schema<V> extractSchema(ConsumerRecord<Object, Object> consumerRecord);
+
+    @Slf4j
     static private class KafkaRecord<V> implements Record<V> {

Review comment:
       (see previous comment about the type parameter to KafkaRecord class)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585267498



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+
+@Slf4j
+class AvroSchemaCache<T> {
+    // we are using the Object identity as key of the cache
+    // we do not want to perform costly operations in order to lookup into the cache
+    private IdentityHashMap<org.apache.avro.Schema, Schema<T>> cache = new IdentityHashMap<>();
+
+    public synchronized Schema<T> get(org.apache.avro.Schema avroSchema) {
+        if (cache.size() > 100) {
+            // very simple auto cleanup
+            // schema do not change very often, we just want this map to grow

Review comment:
       ```suggestion
               // schema do not change very often, we just do not want this map to grow
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-791402895


   /pulsarbot run-failure-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-782001765


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-791529147


   /pulsarbot rerun-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r577270066



##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");

Review comment:
       If `KafkaBytesSource` is a connector for transfer bytes with different schemas, why do we need to set a schema registry URL here? Because `schema.registry.url` is only needed for AVRO schema.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? isKey : false);
+                    Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       This doesn't look correct to me. It will request schema from the schema registry each time on deserializing the message.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {

Review comment:
       I think you can get a schemaId and a ByteBuffer just like what I did at https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/serde/KafkaSchemaAndBytesDeserializer.java.
   
   You can write a Schema wrapper to transfer the ByteBuffer like what I did at https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/schema/KafkaAvroSchema.java 

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? isKey : false);
+                    Schema schema = this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       I think getting request schema information should be done in the `extractSchema` call, not in a deserializer. 

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {

Review comment:
       Since we are refactoring the Kafka connector to support schema, we should change the key to `Object` to allow supporting key schema in the future. I understand you want to push this change in as early as you can. I am fine with that. But we need to be super clear about what we are doing and how we can extend it. 

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {

Review comment:
       I am not sure we should deserialize the buffer and convert it to a generic record. Because you are already parsing the buffer, we should just be able to transfer the bytes. I will look back to see how we did in our Kafka connector.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();

Review comment:
       PulsarAvroSchemaCache should be maintained at the connector level, not in the deserializer.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        String currentValue = props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoCopyKafkaAvroDeserializer.class.getName());
+        }
+

Review comment:
       I think we should throw exceptions if the value deserializer is not supported.

##########
File path: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+
+@Slf4j
+class PulsarSchemaCache<T> {

Review comment:
       This is an AVRO schema cache. Not Pulsar schema cache.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-780172841


   I have merged this branch with current master.
   
   @sijie PTAL when you have time


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#issuecomment-790787276


   /pulsarbot run-failure-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r590040956



##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -48,12 +49,42 @@
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry</artifactId>
+      <version>${kafka.confluent.schemaregistryclient.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-streams-avro-serde</artifactId>
+      <version>${kafka.confluent.avroserializer.version}</version>
+    </dependency>
+
+    <dependency>

Review comment:
       good catch. I needed it initially.
   now it is useless.
   
   dropped




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org