You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/19 23:33:46 UTC

[GitHub] [pulsar] dlg99 commented on a change in pull request #10267: Add documentation about Sink and Source

dlg99 commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r616243256



##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();

Review comment:
       `genericObject`, `schema ` can be null, add handling of these cases into the example

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        
+        KeyValue keyValue = (KeyValue) nativeObject;        
+        Object key = keyValue.getKey();
+        Object value = keyValue.getValue();
+        
+        KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+        Schema keySchema = keyValueSchema.getKeySchema();
+        Schema valueSchema = keyValueSchema.getValueSchema();        

Review comment:
       wrap it into `if (type == SchemaType.KEY_VALUE)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org