You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/19 13:23:00 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #10267: Add documentation about Sink and Source

eolivelli opened a new pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267


   Add documentation about recently implemented features in Pulsar IO regarding:
   - Sink<GenericObject>: Sinks that are not aware (at build time)of the schema type
   - Source<byte[]>: Sources that are not aware (at build time) of the schema they are going to produce 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618123847



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:
+- it must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema` and `getKeyValueEncodingType`

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#issuecomment-824782928


   @Anonymitaet I am sorry, probably I did some mess while committing.
   I have applied now the changes using the GitHub UI.
   
   It should be good to go now
   
   thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618229160



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information

Review comment:
       @eolivelli seems that some comments are not incorporated? (like this)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet merged pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
Anonymitaet merged pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r616243256



##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();

Review comment:
       `genericObject`, `schema ` can be null, add handling of these cases into the example

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        
+        KeyValue keyValue = (KeyValue) nativeObject;        
+        Object key = keyValue.getKey();
+        Object value = keyValue.getValue();
+        
+        KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+        Schema keySchema = keyValueSchema.getKeySchema();
+        Schema valueSchema = keyValueSchema.getValueSchema();        

Review comment:
       wrap it into `if (type == SchemaType.KEY_VALUE)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#issuecomment-824587263


   @dlg99 @Anonymitaet thanks for your suggestions.
   I have addressed your comments.
   please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r616261206



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.

Review comment:
       ```suggestion
   Pulsar IO automatically handles the schema and provides a strongly typed API based on Java generics.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.

Review comment:
       ```suggestion
   If you know the schema type that you are producing, you can declare the Java class relative to that type in your sink declaration.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:
+- it must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema` and `getKeyValueEncodingType`

Review comment:
       ```suggestion
   - It must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema`, and `getKeyValueEncodingType`
   ```
   
   Capitalize the first letter of the first word in the list. Please check all.

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.

Review comment:
       same comment as previously

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code

Review comment:
       ```suggestion
   In the case of KeyValue type, you can access both the schema for the key and the schema for the value using this code.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information

Review comment:
       ```suggestion
   ## Handle schema information
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:
+- it must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema` and `getKeyValueEncodingType`
+- it must return a `KeyValue` object as `Record.getValue()`
+- it may return null in `Record.getSchema()`
+
+When Pulsar IO runtime encounters a `KVRecord` it will automatically:

Review comment:
       ```suggestion
   When Pulsar IO runtime encounters a `KVRecord`, it brings the following changes automatically:
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the

Review comment:
       ```suggestion
   In the case of AVRO, JSON, and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE), you can cast the
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().

Review comment:
       ```suggestion
   If you want to implement a source that works with any schema, you can go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:

Review comment:
       ```suggestion
   To handle the `KeyValue` type properly, follow the guidelines for your record implementation:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618124715



##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618123514



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#issuecomment-825280244


   @eolivelli thanks for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618123089



##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        
+        KeyValue keyValue = (KeyValue) nativeObject;        
+        Object key = keyValue.getKey();
+        Object value = keyValue.getValue();
+        
+        KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+        Schema keySchema = keyValueSchema.getKeySchema();
+        Schema valueSchema = keyValueSchema.getValueSchema();        

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618123321



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#issuecomment-822885355


   @eolivelli many thanks for your contribution! I've left some comments, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10267: Add documentation about Sink and Source

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r618124529



##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org