You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/01 09:14:23 UTC

[GitHub] [pulsar] Anonymitaet commented on a change in pull request #4845: [Doc] Add *Manage Schema* Section

Anonymitaet commented on a change in pull request #4845: [Doc] Add *Manage Schema* Section
URL: https://github.com/apache/pulsar/pull/4845#discussion_r309599616
 
 

 ##########
 File path: site2/docs/schema-manage.md
 ##########
 @@ -0,0 +1,497 @@
+---
+id: schema-manage
+title: Manage schema
+sidebar_label: Manage schema
+---
+
+## Schema AutoUpdate
+
+If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. 
+
+### AutoUpdate for producer
+
+For a producer, the `AutoUpdate` happens in the following cases:
+
+* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically.
+
+* If a **topic has a schema**:
+
+  * If a **producer doesn’t carry a schema**:
+
+    * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. 
+    
+    * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected.
+
+  * If a **producer carries a schema**:
+  
+    A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. 
+    
+    * If it is a new schema and it passes the compatibility check, the broker registers a new schema automatically for the topic.
+
+    * If the schema does not pass the compatibility check, the broker does not register a schema.
+
+![AutoUpdate Producer](assets/schema-autoupdate-producer.png)
+
+### AutoUpdate for consumer
+
+For a consumer, the `AutoUpdate` happens in the following cases:
+
+* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
+
+* If a **consumer connects to a topic with a schema**:
+
+  * If the **topic is idle** (no producers, no entries, no other consumers and no registered schemas), the broker registers the schema for the topic automatically.
+
+  * If the **topic is not idle**, the broker verifies if the schema provided by the consumer is compatible with the registered schema of the topic. 
+    
+    * If the **schema passes the compatibility check**, the consumer can connect to the topic and receive messages. 
+    
+    * If the **schema does not pass the compatibility check**, the consumer is rejected and disconnected.
+
+### Manage AutoUpdate strategy
+
+You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as below:
+
+* Disable AutoUpdate
+
+* Adjust compatibility
+
+#### Disable AutoUpdate 
+
+To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
+
+```
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disabled tenant/namespace
+```
+
+Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command.
+
+#### Adjust compatibility
+
+To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command.
+
+```
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --compatibility <compatibility-level> tenant/namespace
+```
+
+### Schema validation
+
+By default, `schemaValidationEnforced` is **disabled** for producers:
+
+* This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. 
+
+* This allows non-java language clients that don’t support schema can produce messages to a topic with schemas.
+
+However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis.
+
+#### Enable schema validation
+
+To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```
+bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
+```
+
+#### Disable schema validation
+
+To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```
+bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
+```
+
+## Schema manual management
+
+To manage schemas, you can use one of the following methods:
+
+* **Admin CLI** (`pulsar-admin` CLI)
+  
+* **REST API** (Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas)
+
+* **Java Admin API** (use Java admin library provided by Pulsar)
+
+### Upload a schema
+
+To upload (register) a new schema for a topic, you can use one of the following methods: 
+
+* **Admin CLI**
+
+* **REST API**
+
+* **Java Admin API** 
+
+#### Admin CLI
+
+To upload (register) a new schema for a topic, you can use the `upload` subcommand.
+
+```text
+$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
+```
+
+The `schema-definition-file` is in a JSON format. 
+
+```text
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The `schema-definition-file` includes the following fields:
+
+Field | Description |
+---|---|
+`type` | The schema type. | 
+`schema` | The schema definition data, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this field should be blank. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+`properties` | The additional properties associated with the schema. |
+
+Here are some examples of the `schema-definition-file` for a JSON schema.
+
+**Example 1**
+
+```text
+{
+    "type": "JSON",
+    "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}",
+    "properties": {}
+}
+```
+
+**Example 2**
+
+```text
+{
+    "type": "STRING",
+    "schema": "",
+    "properties": {
+        "key1": "value1"
+    }
+}
+```
+
+#### REST API
+
+To upload (register) a new schema for a topic, you can send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema}.
+
+The post payload is in JSON format.
+
+```text
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The post payload includes the following fields:
+
+Field | Description |
+---|---|
+`type` | The schema type. | 
+`schema` | The schema definition data of the schema, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this field should be blank. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition of the struct.
+`properties` | The additional properties associated with the schema. |
+
+#### Java Admin API
+
+To upload (register) a new schema for a topic, you can use the following method.
+
+```text
+void createSchema(String topic, PostSchemaPayload schemaPayload)
+```
+
+The `PostSchemaPayload` includes the following fields:
+
+Field | Description |
+---|---|
+`type` | The schema type. | 
+`schema` | The schema definition data, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this field should be blank. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+`properties` | The additional properties associated with the schema. |
+
+Here is an example of `PostSchemaPayload`:
+
+```text
+PulsarAdmin admin = …;
+
+PostSchemaPayload payload = new PostSchemaPayload();
+payload.setType("INT8");
+payload.setSchema("");
+
+admin.createSchema("my-tenant/my-ns/my-topic", payload); 
+```
+### Get a schema (latest)
+
+To get the latest schema for a topic, you can use one of the following methods: 
+
+* **Admin CLI**
+
+* **REST API**
+
+* **Java Admin API** 
+
+#### Admin CLI
+
+To get the latest schema for a topic, you can use the `get` subcommand.
+
+```text
+$ pulsar-admin schemas get <topic-name>
+
+{
+    "version": 0,
+    "type": "String",
+    "timestamp": 0,
+    "data": "string",
+    "properties": {
+        "property1": "string",
+        "property2": "string"
+    }
+}
+```
+
+#### REST API
+
+To get the latest version of a schema, you can send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema}.
+
+Here is an example of a response, which is returned in a JSON format.
+
+```text
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number | 
+`type` | The schema type |
+`timestamp` | The timestamp of creating this version of schema |
+`data` | The schema definition data, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this field should be blank. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+`properties` | The additional properties associated with the schema. |
+
+#### Java Admin API
+
+To get the latest schema for a topic, you can use the following method.
+
+```text
+SchemaInfo createSchema(String topic)
+```
+
+The `SchemaInfo` includes the following fields:
+
+Field | Description |
+---|---|
+`name` | The schema name | 
+`type` | The schema type |
+`schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this byte array should be empty. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
+`properties` | The additional properties associated with the schema. |
+
+Here is an example of `SchemaInfo`:
+
+```text
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic"); 
+```
+
+### Get a schema (specific)
+
+To get a specific version of a schema, you can use one of the following methods: 
+
+* **Admin CLI**
+
+* **REST API**
+
+* **Java Admin API** 
+
+#### Admin CLI
+
+To get a specific version of a schema, you can use the `get` subcommand.
+
+```text
+$ pulsar-admin schemas get <topic-name> --version <version> 
+```
+
+#### REST API
+
+To get a specific version of a schema, you can send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema}.
+
+Here is an example of a response, which is returned in a JSON format.
+
+```text
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number. | 
+`type` | The schema type. |
+`timestamp` | The timestamp of creating this version of schema. |
+`data` | The schema definition, which is encoded in UTF 8 charset. <br>If the schema is a **primitive** schema, this field should be blank. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+`properties` | The additional properties associated with the schema. |
+
+#### Java Admin API
+
+To get a specific version of a schema, you can use the following method.
+
+```text
+SchemaInfo createSchema(String topic, long version)
+```
+
+The `SchemaInfo` includes the following fields:
+
+Field | Description |
+---|---|
+`name` | The schema name. | 
+`type` | The schema type. |
+`schema` | A byte array of the schema definition data, which is encoded in UTF 8. <br>If the schema is a **primitive** schema, this byte array should be empty. <br>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
+`properties` | The additional properties associated with the schema. |
+
+Here is an example of `SchemaInfo`:
+
+```text
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
+```
+### Delete a schema
+
+To delete a schema for a topic, you can use one of the following methods: 
+
+* **Admin CLI**
+
+* **REST API**
+
+* **Java Admin API** 
+
+> #### Note
+> 
+> In any case, the **delete** action deletes **all versions** of a schema registered for a topic.
+
+#### Admin CLI
+
+To delete all versions of a schema, you can use the `delete` subcommand.
+
+```text
+$ pulsar-admin schemas delete <topic-name>
+```
+
+#### REST API
+
+To delete all versions of a schema, you can send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema} .
+
+Here is an example of a response, which is returned in a JSON format.
+
+```text
+{
+    "version": "<the-latest-version-number-of-the-schema>",
+}
+```
+
+The response includes the following field:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number. | 
+
+#### Java Admin API
+
+To delete all versions of a schema, you can use the following method.
+
+```text
+void deleteSchema(String topic)
+```
+
+Here is an example of deleting a schema.
+
+```text
+PulsarAdmin admin = …;
+
+admin.deleteSchema("my-tenant/my-ns/my-topic"); 
+```
+
+## Custom schema storage
+
+By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. However, you can use another storage system if needed. 
+
+### Implement
+
+To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement two Java interfaces: 
+
+* [`SchemaStorage`](#schemastorage-interface) 
+
+* [`SchemaStorageFactory`](#schemastoragefactory-interface)
+
+#### SchemaStorage interface
+
+The `SchemaStorage` interface has the following methods:
+
+```text
+public interface SchemaStorage {
+    // How schemas are updated
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
+
+    // How schemas are fetched from storage
+    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+    // How schemas are deleted
+    CompletableFuture<SchemaVersion> delete(String key);
+
+    // Utility method for converting a schema version byte array to a SchemaVersion object
+    SchemaVersion versionFromBytes(byte[] version);
+
+    // Startup behavior for the schema storage client
+    void start() throws Exception;
+
+    // Shutdown behavior for the schema storage client
+    void close() throws Exception;
+}
+```
+
+> #### Tip
+> 
+> For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
+
+#### SchemaStorageFactory interface 
+
+The `SchemaStorageFactory` interface has the following method:
+
+```text
+public interface SchemaStorageFactory {
+    @NotNull
+    SchemaStorage create(PulsarService pulsar) throws Exception;
+}
+```
+
+> Tip
+> 
+> For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
+
+### Deploy
+
+To use your custom schema storage implementation, you can perform the following steps.
+
+1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file.
+   
+2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution.
+   
+3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf` to your custom factory class.
+
+    For example, change the `schemaRegistryStorageClassName` to the `SchemaStorageFactory` implementation rather than the `SchemaStorage` implementation.
+      
 
 Review comment:
   OK

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services