You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/24 10:24:17 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

eolivelli opened a new pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847


   ### Motivation
   
   Currently a Function cannot access the original Schema of the Message but it only receives AutoConsumeSchema that is a special schema that is not suitable to Producing messages.
   
   This is an example of Identity Function that picks any message, in spite of the Schema and writes it to a output topic.
   
   ```
   @Slf4j
   public class MyFunctionIdentityTransform implements Function<GenericObject, Void> {
   
       @Override
       public Void process(GenericObject genericObject, Context context) throws Exception {
           Record<?> currentRecord = context.getCurrentRecord();
           log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
           log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
           context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
                   .value(genericObject.getNativeObject()).send();
           return null;
       }
   }
   ```
   
   This kind of Functions must also work well with KeyValue<GenericRecord, GenericRecord> input messages, and preserve the schema properties (like KeyValueEncoding.SEPARATED, or the SchemaType of the components).
   
   ### Modifications
   
   Unwrap AutoConsumeSchema in PulsarSource, when we pick a Message from the Pulsar topic, and set on the PulsarRecord the wrapped Schema.
   
   ### Verifying this change
   
   I will add tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847#issuecomment-1077471167


   @congbobo184 This patch is for early preview, I am going to add integration tests that covers the example function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847#issuecomment-1077568052


   @eolivelli:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847#issuecomment-1077539317


   This is another kind of Function that is unblocked by this patch.
   The function reads any Object and in case it is a KeyValue<?, AVRO> it removes a field from the AVRO struct and then writes the KeyValue downstream
   (this is only an example, we should cache the Schema instances and organise the code better)
   
   
   ```
   @Slf4j
   public class MyFunctionRemoveFieldTransform implements Function<GenericObject, Void> {
   
       private static final String FIELD_TO_REMOVE = "foo";
   
       @Override
       public Void process(GenericObject genericObject, Context context) throws Exception {
           Record<?> currentRecord = context.getCurrentRecord();
           log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
           log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
           Object nativeObject = genericObject.getNativeObject();
           Schema<?> schema = currentRecord.getSchema();
   
           Schema outputSchema = schema;
           Object outputObject = genericObject.getNativeObject();
   
           if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue)  {
               KeyValueSchema kvSchema = (KeyValueSchema) schema;
   
               Schema keySchema = kvSchema.getKeySchema();
               Schema valueSchema = kvSchema.getValueSchema();
               // remove a column "foo" from the "valueSchema"
               if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
   
                   org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
                   if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
                       org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
                       org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
                       org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
                               originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
                               originalAvroSchema.getFields().
                                   stream()
                                   .filter(f->!f.name().equals(FIELD_TO_REMOVE))
                                   .map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
                                   .collect(Collectors.toList()));
   
                       Schema newValueSchema = Schema.NATIVE_AVRO(modified);
   
                       outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
                       KeyValue originalObject = (KeyValue) nativeObject;
   
                       GenericRecord value = (GenericRecord) originalObject.getValue();
                       org.apache.avro.generic.GenericRecord genericRecord
                               = (org.apache.avro.generic.GenericRecord) value.getNativeObject();
   
                       org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
                       for (org.apache.avro.Schema.Field field : modified.getFields()) {
                           newRecord.put(field.name(), genericRecord.get(field.name()));
                       }
                       GenericDatumWriter writer = new GenericDatumWriter(modified);
                       ByteArrayOutputStream oo = new ByteArrayOutputStream();
                       BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
                       writer.write(newRecord, encoder);
                       Object newValue = oo.toByteArray();
   
                       outputObject = new KeyValue(originalObject.getKey(), newValue);
                   }
   
   
               }
           }
           log.info("output {} schema {}", outputObject, outputSchema);
           context.newOutputMessage(context.getOutputTopic(), outputSchema)
                   .value(outputObject).send();
           return null;
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli closed pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli closed pull request #14847: Pulsar Functions: allow a Function to access the original Schema of the Message and use it

Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org