You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/24 06:45:59 UTC

[GitHub] [pulsar] eolivelli opened a new issue #14842: [improvement] Functions: ability to produce KeyValue output

eolivelli opened a new issue #14842:
URL: https://github.com/apache/pulsar/issues/14842


   **Is your enhancement request related to a problem? Please describe.**
   Currently (Pulsar 2.10)  if I code a simply Function like this:
   
   ```
   @Slf4j
   public class MyFunction implements Function<GenericObject, KeyValue<MyFunction.MyKey, MyFunction.MyValue>> {
   
       @Data
       @AllArgsConstructor
       public static final class MyKey {
           String name;
       }
       @Data
       @AllArgsConstructor
       public static final class MyValue {
           int age;
       }
   
       @Override
       public KeyValue<MyFunction.MyKey, MyFunction.MyValue> apply(GenericObject genericObject) {
           log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
           return new KeyValue<>(new MyKey("foo"), new MyValue(543));
       }
   }
   ```
   
   I am able to successfully consume a KeyValue<GenericRecord,GenericRecord> topic (like a topic written by Debezium or the Kafka Source), but I cannot produce to the output topic, because Pulsar sets the key and value type of the KeyValue to byte[].
   It seems to ignore the Generic Type of the KeyVale Output datatype .
   
   This is the error we can see
   
   ```
   8:00:30.993 [public/default/ciao2-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/ciao2:0] Uncaught exception in Java Instance
   java.lang.ClassCastException: class MyFunction$MyKey cannot be cast to class [B (MyFunction$MyKey is in unnamed module of loader org.apache.pulsar.common.nar.NarClassLoader @70f822e; [B is in module java.base of loader 'bootstrap')
   	at org.apache.pulsar.client.impl.schema.BytesSchema.encode(BytesSchema.java:28) ~[pulsar-client-original.jar:2.8.0.1.1.29-SNAPSHOT]
   	at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:107) ~[java-instance.jar:?]
   ```
   
   **Describe the solution you'd like**
   No error happens, out-of-the-box. Data is encoded with KeyValue<JSON, JSON> or KeyValue<AVRO,AVRO> (and SEPARATE encoding possibly).
   
   **Describe alternatives you've considered**
   Setting a schema on the output topic before creating the function (I didn't test this, we should add integration tests for this case as well)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #14842: [improvement] Functions: ability to produce KeyValue output

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #14842:
URL: https://github.com/apache/pulsar/issues/14842#issuecomment-1077290901


   I am working on a fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org