You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/20 00:39:20 UTC

[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10267: Add documentation about Sink and Source

Anonymitaet commented on a change in pull request #10267:
URL: https://github.com/apache/pulsar/pull/10267#discussion_r616261206



##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.

Review comment:
       ```suggestion
   Pulsar IO automatically handles the schema and provides a strongly typed API based on Java generics.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.

Review comment:
       ```suggestion
   If you know the schema type that you are producing, you can declare the Java class relative to that type in your sink declaration.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:
+- it must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema` and `getKeyValueEncodingType`

Review comment:
       ```suggestion
   - It must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema`, and `getKeyValueEncodingType`
   ```
   
   Capitalize the first letter of the first word in the list. Please check all.

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.

Review comment:
       same comment as previously

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the
+`genericObject` variable to `GenericRecord` and use `getFields()` and `getField()` API.
+You are able to access the native AVRO record using  `genericObject.getNativeObject()`.
+
+In case of KeyValue type you can access both the Schema for the Key and the schema for the Value using this code

Review comment:
       ```suggestion
   In the case of KeyValue type, you can access both the schema for the key and the schema for the value using this code.
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information

Review comment:
       ```suggestion
   ## Handle schema information
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:
+- it must implement {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java} interface and implement `getKeySchema`,`getValueSchema` and `getKeyValueEncodingType`
+- it must return a `KeyValue` object as `Record.getValue()`
+- it may return null in `Record.getSchema()`
+
+When Pulsar IO runtime encounters a `KVRecord` it will automatically:

Review comment:
       ```suggestion
   When Pulsar IO runtime encounters a `KVRecord`, it brings the following changes automatically:
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -123,6 +163,57 @@ Developing a sink connector **is similar to** developing a source connector, tha
 
     You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySink implements Sink<String> {
+    public void write(Record<String> record) {}
+}
+```
+If you want to implement a Sink that works with any schema then you go with the special GenericObject interface.
+
+```
+public class MySink implements Sink<GenericObject> {
+    public void write(Record<GenericObject> record) {
+        Schema schema = record.getSchema();
+        GenericObject genericObject = record.getValue();
+        SchemaType type = genericObject.getSchemaType();
+        Object nativeObject = genericObject.getNativeObject();
+        ....
+    }
+}
+```
+
+In case of AVRO, JSON and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE) you can cast the

Review comment:
       ```suggestion
   In the case of AVRO, JSON, and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE), you can cast the
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().

Review comment:
       ```suggestion
   If you want to implement a source that works with any schema, you can go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
   ```

##########
File path: site2/docs/io-develop.md
##########
@@ -85,6 +85,46 @@ interface, which means you need to implement the {@inject: github:`open`:/pulsar
         `ack` |Acknowledge that the record is fully processed.
         `fail`|Indicate that the record fails to be processed.
 
+## Handling Schema information
+
+Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
+If you know the Schema type that you are producing you can declare the Java class relative to that type in your Sink declaration.
+
+```
+public class MySource implements Source<String> {
+    public Record<String> read() {}
+}
+```
+If you want to implement a Source that works with any schema then you go with `byte[]` (of `ByteBuffer`) and use Schema.AUTO_PRODUCE_BYTES().
+
+```
+public class MySource implements Source<byte[]> {
+    public Record<byte[]> read() {
+        
+        Schema wantedSchema = ....
+        Record<byte[]> myRecord = new MyRecordImplementation(); 
+        ....
+    }
+    class MyRecordImplementation implements Record<byte[]> {
+         public byte[] getValue() {
+            return ....encoded byte[]...that represents the value 
+         }
+         public Schema<byte[]> getSchema() {
+             return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
+         }
+    }
+}
+```
+
+In order to handle properly the KeyValue type your Record implementation must follow this convention:

Review comment:
       ```suggestion
   To handle the `KeyValue` type properly, follow the guidelines for your record implementation:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org