You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/07 02:23:31 UTC

[pulsar] branch master updated: [Doc] Add *Manage Schema* Section (#4845)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 32337fc  [Doc] Add *Manage Schema* Section (#4845)
32337fc is described below

commit 32337fcc9677d891fa8939f6f7185d598acebd95
Author: Anonymitaet <50...@users.noreply.github.com>
AuthorDate: Wed Aug 7 10:23:25 2019 +0800

    [Doc] Add *Manage Schema* Section (#4845)
    
    Structure of Schema Chapter: https://github.com/apache/pulsar/issues/4789
---
 site2/docs/assets/schema-autoupdate-consumer.png | Bin 0 -> 86897 bytes
 site2/docs/assets/schema-autoupdate-producer.png | Bin 0 -> 116544 bytes
 site2/docs/schema-manage.md                      | 785 +++++++++++++++++++++++
 site2/website/sidebars.json                      |   5 +-
 4 files changed, 789 insertions(+), 1 deletion(-)

diff --git a/site2/docs/assets/schema-autoupdate-consumer.png b/site2/docs/assets/schema-autoupdate-consumer.png
new file mode 100644
index 0000000..3f6f6a7
Binary files /dev/null and b/site2/docs/assets/schema-autoupdate-consumer.png differ
diff --git a/site2/docs/assets/schema-autoupdate-producer.png b/site2/docs/assets/schema-autoupdate-producer.png
new file mode 100644
index 0000000..b8d046c
Binary files /dev/null and b/site2/docs/assets/schema-autoupdate-producer.png differ
diff --git a/site2/docs/schema-manage.md b/site2/docs/schema-manage.md
new file mode 100644
index 0000000..b2f9f0a
--- /dev/null
+++ b/site2/docs/schema-manage.md
@@ -0,0 +1,785 @@
+---
+id: schema-manage
+title: Manage schema
+sidebar_label: Manage schema
+---
+
+Schemas can be managed in the following methods:
+
+* Automatically 
+  
+  * [Schema AutoUpdate](#schema-autoupdate)
+
+* Manually
+  
+  * [Schema manual management](#schema-manual-management)
+  
+  * [Custom schema storage](#custom-schema-storage)
+
+## Schema AutoUpdate
+
+If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. 
+
+### AutoUpdate for producer
+
+For a producer, the `AutoUpdate` happens in the following cases:
+
+* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically.
+
+* If a **topic has a schema**:
+
+  * If a **producer doesn’t carry a schema**:
+
+    * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. 
+    
+    * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected.
+
+  * If a **producer carries a schema**:
+  
+    A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. 
+    
+    * If it is a new schema and it passes the compatibility check, the broker registers a new schema automatically for the topic.
+
+    * If the schema does not pass the compatibility check, the broker does not register a schema.
+
+![AutoUpdate Producer](assets/schema-autoupdate-producer.png)
+
+### AutoUpdate for consumer
+
+For a consumer, the `AutoUpdate` happens in the following cases:
+
+* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
+
+* If a **consumer connects to a topic with a schema**:
+
+  * If the **topic is idle** (no producers, no entries, no other consumers and no registered schemas), the broker registers a schema for the topic automatically.
+
+  * If the **topic is not idle**, the broker verifies if the schema provided by the consumer is compatible with the registered schema of the topic. 
+    
+    * If the **schema passes the compatibility check**, the consumer can connect to the topic and receive messages. 
+    
+    * If the **schema does not pass the compatibility check**, the consumer is rejected and disconnected.
+
+![AutoUpdate Producer](assets/schema-autoupdate-consumer.png)
+
+### Manage AutoUpdate strategy
+
+You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as below:
+
+* [Disable AutoUpdate](#disable-autoupdate)
+
+* [Adjust compatibility](#adjust-compatibility)
+
+#### Disable AutoUpdate 
+
+To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disabled tenant/namespace
+```
+
+Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command.
+
+#### Adjust compatibility
+
+To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --compatibility <compatibility-level> tenant/namespace
+```
+
+### Schema validation
+
+By default, `schemaValidationEnforced` is **disabled** for producers:
+
+* This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. 
+
+* This allows non-java language clients that don’t support schema can produce messages to a topic with schemas.
+
+However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis.
+
+#### Enable schema validation
+
+To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
+```
+
+#### Disable schema validation
+
+To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
+```
+
+## Schema manual management
+
+To manage schemas, you can use one of the following methods.
+
+<table style="table">
+  <tr>
+    <th>Method</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+**Admin CLI**
+    </td>
+    <td>
+You can use the `pulsar-admin` tool to manage Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on.
+
+For more information about how to use the `pulsar-admin` tool, see [here](reference-pulsar-admin.md).
+    </td> 
+  </tr>
+  <tr>
+    <td>
+    
+**REST API**
+    </td>
+    <td>
+    
+Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas.
+
+For more information about how to use the Pulsar REST API, see [here](http://pulsar.apache.org/admin-rest-api/).
+    </td>
+ </tr>
+ <tr>
+    <td>
+    
+**Java Admin API**
+    </td>
+    <td>Pulsar provides Java admin library.</td>
+ </tr>
+</table>
+
+### Upload a schema
+
+To upload (register) a new schema for a topic, you can use one of the following methods.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--Admin CLI-->
+
+Use the `upload` subcommand.
+
+```bash
+$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
+```
+
+The `schema-definition-file` is in JSON format. 
+
+```json
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The `schema-definition-file` includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`schema`
+    </td>
+    <td>
+
+The schema definition data, which is encoded in UTF 8 charset.
+    
+* If the schema is a **primitive** schema, this field should be blank.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+Here are examples of the `schema-definition-file` for a JSON schema.
+
+**Example 1**
+
+```json
+{
+    "type": "JSON",
+    "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}",
+    "properties": {}
+}
+```
+
+**Example 2**
+
+```json
+{
+    "type": "STRING",
+    "schema": "",
+    "properties": {
+        "key1": "value1"
+    }
+}
+```
+
+<!--REST API-->
+
+Send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema}
+
+The post payload is in JSON format.
+
+```json
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The post payload includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`schema`
+    </td>
+    <td>
+The schema definition data, which is encoded in UTF 8 charset.
+    
+* If the schema is a **primitive** schema, this field should be blank.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+<!--Java Admin API-->
+
+```java
+void createSchema(String topic, PostSchemaPayload schemaPayload)
+```
+
+The `PostSchemaPayload` includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`schema`
+    </td>
+    <td>
+The schema definition data, which is encoded in UTF 8 charset.
+    
+* If the schema is a **primitive** schema, this field should be blank.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+Here is an example of `PostSchemaPayload`:
+
+```java
+PulsarAdmin admin = …;
+
+PostSchemaPayload payload = new PostSchemaPayload();
+payload.setType("INT8");
+payload.setSchema("");
+
+admin.createSchema("my-tenant/my-ns/my-topic", payload); 
+```
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+### Get a schema (latest)
+
+To get the latest schema for a topic, you can use one of the following methods. 
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--Admin CLI-->
+
+Use the `get` subcommand.
+
+```bash
+$ pulsar-admin schemas get <topic-name>
+
+{
+    "version": 0,
+    "type": "String",
+    "timestamp": 0,
+    "data": "string",
+    "properties": {
+        "property1": "string",
+        "property2": "string"
+    }
+}
+```
+
+<!--REST API-->
+
+Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`version`
+    </td>
+    <td>
+    The schema version, which is a long number.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`timestamp` 
+    </td>
+    <td>
+    The timestamp of creating this version of schema.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`data`
+    </td>
+    <td>
+The schema definition data, which is encoded in UTF 8 charset.
+    
+* If the schema is a **primitive** schema, this field should be blank.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+<!--Java Admin API-->
+
+```java
+SchemaInfo createSchema(String topic)
+```
+
+The `SchemaInfo` includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`name`
+    </td>
+    <td>
+    The schema name.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`schema`
+    </td>
+    <td>
+A byte array of the schema definition data, which is encoded in UTF 8 charset. 
+    
+* If the schema is a **primitive** schema, this byte array should be empty. 
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+Here is an example of `SchemaInfo`:
+
+```java
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic"); 
+```
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+### Get a schema (specific)
+
+To get a specific version of a schema, you can use one of the following methods.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--Admin CLI-->
+
+Use the `get` subcommand.
+
+```bash
+$ pulsar-admin schemas get <topic-name> --version=<version> 
+```
+
+<!--REST API-->
+
+Send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+```
+
+The response includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`version`
+    </td>
+    <td>
+    The schema version, which is a long number.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>
+    The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`timestamp` 
+    </td>
+    <td>
+    The timestamp of creating this version of schema.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`data`
+    </td>
+    <td>
+The schema definition data, which is encoded in UTF 8 charset.
+    
+* If the schema is a **primitive** schema, this field should be blank.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+<!--Java Admin API-->
+
+```java
+SchemaInfo createSchema(String topic, long version)
+```
+
+The `SchemaInfo` includes the following fields:
+
+<table style="table">
+  <tr>
+    <th>Field</th>
+    <th>Description</th> 
+  </tr>
+  <tr>
+    <td>
+
+`name`
+    </td>
+    <td>The schema name.</td> 
+  </tr>
+  <tr>
+    <td>
+
+`type`
+    </td>
+    <td>The schema type.</td> 
+  </tr>
+  <tr>
+    <td>
+    
+`schema`
+    </td>
+    <td>
+A byte array of the schema definition data, which is encoded in UTF 8.
+    
+* If the schema is a **primitive** schema, this byte array should be empty.
+  
+* If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
+  </td> 
+  </tr>
+  <tr>
+    <td>
+    
+    `properties`
+    </td>
+    <td>The additional properties associated with the schema.</td> 
+  </tr>  
+</table>
+
+Here is an example of `SchemaInfo`:
+
+```java
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
+```
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+### Extract a schema
+
+To provide a schema via a topic, you can use the following method.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--Admin CLI-->
+
+Use the `extract` subcommand.
+
+```bash
+$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
+```
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+### Delete a schema
+
+To delete a schema for a topic, you can use one of the following methods.
+
+> #### Note
+> 
+> In any case, the **delete** action deletes **all versions** of a schema registered for a topic.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--Admin CLI-->
+
+Use the `delete` subcommand.
+
+```bash
+$ pulsar-admin schemas delete <topic-name>
+```
+
+<!--REST API-->
+
+Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema} 
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+{
+    "version": "<the-latest-version-number-of-the-schema>",
+}
+```
+
+The response includes the following field:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number. | 
+
+<!--Java Admin API-->
+
+```java
+void deleteSchema(String topic)
+```
+
+Here is an example of deleting a schema.
+
+```java
+PulsarAdmin admin = …;
+
+admin.deleteSchema("my-tenant/my-ns/my-topic"); 
+```
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+## Custom schema storage
+
+By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. 
+
+However, you can use another storage system if needed. 
+
+### Implement
+
+To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: 
+
+* [SchemaStorage interface](#schemastorage-interface) 
+
+* [SchemaStorageFactory interface](#schemastoragefactory-interface)
+
+#### SchemaStorage interface
+
+The `SchemaStorage` interface has the following methods:
+
+```java
+public interface SchemaStorage {
+    // How schemas are updated
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
+
+    // How schemas are fetched from storage
+    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+    // How schemas are deleted
+    CompletableFuture<SchemaVersion> delete(String key);
+
+    // Utility method for converting a schema version byte array to a SchemaVersion object
+    SchemaVersion versionFromBytes(byte[] version);
+
+    // Startup behavior for the schema storage client
+    void start() throws Exception;
+
+    // Shutdown behavior for the schema storage client
+    void close() throws Exception;
+}
+```
+
+> #### Tip
+> 
+> For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
+
+#### SchemaStorageFactory interface 
+
+The `SchemaStorageFactory` interface has the following method:
+
+```java
+public interface SchemaStorageFactory {
+    @NotNull
+    SchemaStorage create(PulsarService pulsar) throws Exception;
+}
+```
+
+> Tip
+> 
+> For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
+
+### Deploy
+
+To use your custom schema storage implementation, perform the following steps.
+
+1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file.
+   
+2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution.
+   
+3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf` to your custom factory class.
+      
+4. Start Pulsar.
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 6fdb5a4..cc03832 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -19,7 +19,10 @@
       "concepts-schema-registry"
     ],
     "Pulsar Schema": [
-      "schema-get-started"
+      "schema-get-started",
+      "schema-understand",
+      "schema-evolution-compatibility",
+      "schema-manage"
     ],
     "Pulsar Functions": [
       "functions-overview",