You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/26 10:15:19 UTC

[flink] branch master updated (ac1b8db -> b820606)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ac1b8db  [FLINK-13726][docs] Build docs with jekyll 4.0.0.pre.beta1
     new 00b6e8b  [FLINK-13359][docs] Add documentation for DDL introduction
     new b820606  [FLINK-13362][docs] Add DDL documentation for Kafka, ElasticSearch, FileSystem and formats

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/connect.md    | 270 +++++++++++++++++++++++++++++++++++++++++
 docs/dev/table/connect.zh.md | 283 +++++++++++++++++++++++++++++++++++++++++--
 docs/dev/table/sql.md        | 163 +++++++++++++++++++++----
 docs/dev/table/sql.zh.md     | 177 +++++++++++++++++++++++----
 4 files changed, 836 insertions(+), 57 deletions(-)


[flink] 02/02: [FLINK-13362][docs] Add DDL documentation for Kafka, ElasticSearch, FileSystem and formats

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b820606916421f40ffa700c9a403d0782f618320
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 26 13:33:19 2019 +0800

    [FLINK-13362][docs] Add DDL documentation for Kafka, ElasticSearch, FileSystem and formats
---
 docs/dev/table/connect.md    | 270 +++++++++++++++++++++++++++++++++++++++++
 docs/dev/table/connect.zh.md | 283 +++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 546 insertions(+), 7 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 14d2a3b..5378ae9 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -122,6 +122,12 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)")
+{% endhighlight %}
+</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is registered. In case of table type `both`, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS.
@@ -276,6 +282,39 @@ tables:
         type: VARCHAR
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  `user` BIGINT,
+  message VARCHAR,
+  ts VARCHAR
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.0.key' = 'zookeeper.connect',
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'update-mode' = 'append',
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory.
@@ -603,6 +642,16 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
 </div>
 
 See also the [general streaming concepts documentation](streaming/dynamic_tables.html#continuous-queries) for more information.
@@ -652,6 +701,17 @@ connector:
   path: "file:///path/to/whatever"    # required: path to a file or directory
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',               -- required: specify to connector type
+  'connector.path' = 'file:///path/to/whatever'  -- required: path to a file or directory
+)
+{% endhighlight %}
+</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
@@ -753,6 +813,49 @@ connector:
   sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in case of sink partitioner custom
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',       
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the table is read
+
+  'update-mode' = 'append',         -- required: update mode when used as table sink, 
+                                    -- only support append mode now.
+
+  'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'connector.properties.2.key' = 'group.id',
+  'connector.properties.2.value' = 'testGroup',
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset", 
+                                                   -- "latest-offset", "group-offsets", 
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets.0.partition' = '0',
+  'connector.specific-offsets.0.offset' = '42',
+  'connector.specific-offsets.1.partition' = '1',
+  'connector.specific-offsets.1.offset' = '300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions 
+                                         -- into Kafka's partitions valid are "fixed" 
+                                         -- (each Flink partition ends up in at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition is distributed to 
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section [Kafka Consumers Start Position Configuration]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -900,6 +1003,66 @@ connector:
     connection-path-prefix: "/v1"     # optional: prefix string to be added to every REST communication
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch
+  
+  'connector.version' = '6',          -- required: valid connector versions are "6"
+  
+  'connector.hosts.0.hostname' = 'host_name',  -- required: one or more Elasticsearch hosts to connect to
+  'connector.hosts.0.port' = '9092',
+  'connector.hosts.0.protocol' = 'http',
+
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
+
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
+
+  'update-mode' = 'append',            -- optional: update mode when used as table sink.           
+
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite keys ("_" by default)
+                                       -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling strategy in case a request to 
+                                         -- Elasticsearch fails ("fail" by default).
+                                         -- valid strategies are 
+                                         -- "fail" (throws an exception if a request fails and
+                                         -- thus causes a job failure), 
+                                         -- "ignore" (ignores failures and drops the request),
+                                         -- "retry-rejected" (re-adds requests that have failed due 
+                                         -- to queue capacity saturation), 
+                                         -- or "custom" for failure handling with a
+                                         -- ActionRequestFailureHandler subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of actions to buffer 
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval (in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff strategy ("disabled" by default)
+                                                      -- valid strategies are "disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix string to be added to every
+                                                      -- REST communication
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -1019,6 +1182,38 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.derive-schema' = 'true',        -- or use the table's schema
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for enclosing field values ('"' by default)
+  'format.allow-comments' = true,         -- optional: ignores comment lines that start with "#" 
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows with parse errors instead of failing;
+                                          -- fields are set to null in case of errors
+  'format.array-element-delimiter' = '|', -- optional: the array element delimiter string for separating
+                                          -- array and row element values (";" by default)
+  'format.escape-character' = '\\',       -- optional: escape character for escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1171,6 +1366,38 @@ format:
   derive-schema: true
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- required: define the schema either by using a type string which parses numbers to corresponding types
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.json-schema' =                    -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+    '{
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }',
+
+  'format.derive-schema' = 'true'          -- or use the table's schema
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1318,6 +1545,27 @@ format:
     }
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping:
@@ -1410,6 +1658,28 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead of failing by default
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+  
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" by default
+  'format.quote-character' = '"',         -- optional: single character for string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional dependencies.
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 9aeacd9..6deb6af 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "连接外部系统"
+title: "Connect to External Systems"
 nav-parent_id: tableapi
 nav-pos: 19
 ---
@@ -122,6 +122,12 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)")
+{% endhighlight %}
+</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is registered. In case of table type `both`, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS.
@@ -276,6 +282,39 @@ tables:
         type: VARCHAR
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  `user` BIGINT,
+  message VARCHAR,
+  ts VARCHAR
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.0.key' = 'zookeeper.connect',
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'update-mode' = 'append',
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory.
@@ -603,6 +642,16 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
 </div>
 
 See also the [general streaming concepts documentation](streaming/dynamic_tables.html#continuous-queries) for more information.
@@ -652,6 +701,17 @@ connector:
   path: "file:///path/to/whatever"    # required: path to a file or directory
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',               -- required: specify to connector type
+  'connector.path' = 'file:///path/to/whatever'  -- required: path to a file or directory
+)
+{% endhighlight %}
+</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
@@ -703,7 +763,7 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
     .version("0.11")  # required: valid connector versions are
                       # "0.8", "0.9", "0.10", "0.11", and "universal"
     .topic("...")     # required: topic name from which the table is read
-    
+
     # optional: connector specific properties
     .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
@@ -753,6 +813,49 @@ connector:
   sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in case of sink partitioner custom
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the table is read
+
+  'update-mode' = 'append',         -- required: update mode when used as table sink,
+                                    -- only support append mode now.
+
+  'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties
+  'connector.properties.0.value' = 'localhost:2181',
+  'connector.properties.1.key' = 'bootstrap.servers',
+  'connector.properties.1.value' = 'localhost:9092',
+  'connector.properties.2.key' = 'group.id',
+  'connector.properties.2.value' = 'testGroup',
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset",
+                                                   -- "latest-offset", "group-offsets",
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets.0.partition' = '0',
+  'connector.specific-offsets.0.offset' = '42',
+  'connector.specific-offsets.1.partition' = '1',
+  'connector.specific-offsets.1.offset' = '300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions
+                                         -- into Kafka's partitions valid are "fixed"
+                                         -- (each Flink partition ends up in at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition is distributed to
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section [Kafka Consumers Start Position Configuration]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -900,6 +1003,66 @@ connector:
     connection-path-prefix: "/v1"     # optional: prefix string to be added to every REST communication
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch
+
+  'connector.version' = '6',          -- required: valid connector versions are "6"
+
+  'connector.hosts.0.hostname' = 'host_name',  -- required: one or more Elasticsearch hosts to connect to
+  'connector.hosts.0.port' = '9092',
+  'connector.hosts.0.protocol' = 'http',
+
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
+
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
+
+  'update-mode' = 'append',            -- optional: update mode when used as table sink.
+
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite keys ("_" by default)
+                                       -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling strategy in case a request to
+                                         -- Elasticsearch fails ("fail" by default).
+                                         -- valid strategies are
+                                         -- "fail" (throws an exception if a request fails and
+                                         -- thus causes a job failure),
+                                         -- "ignore" (ignores failures and drops the request),
+                                         -- "retry-rejected" (re-adds requests that have failed due
+                                         -- to queue capacity saturation),
+                                         -- or "custom" for failure handling with a
+                                         -- ActionRequestFailureHandler subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of actions to buffer
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval (in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff strategy ("disabled" by default)
+                                                      -- valid strategies are "disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix string to be added to every
+                                                      -- REST communication
+)
+{% endhighlight %}
+</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -977,17 +1140,17 @@ The CSV format can be used as follows:
     # or use the table's schema
     .derive_schema()
 
-    .field_delimiter(";")          # optional: field delimiter character ("," by default)
+    .field_delimiter(';')          # optional: field delimiter character (',' by default)
     .line_delimiter("\r\n")        # optional: line delimiter ("\n" by default;
                                    #   otherwise "\r" or "\r\n" are allowed)
-    .quote_character("'")         # optional: quote character for enclosing field values ('"' by default)
-    .allow_comments()              # optional: ignores comment lines that start with "#" (disabled by default);
+    .quote_character('\'')         # optional: quote character for enclosing field values ('"' by default)
+    .allow_comments()              # optional: ignores comment lines that start with '#' (disabled by default);
                                    #   if enabled, make sure to also ignore parse errors to allow empty rows
     .ignore_parse_errors()         # optional: skip fields and rows with parse errors instead of failing;
                                    #   fields are set to null in case of errors
     .array_element_delimiter("|")  # optional: the array element delimiter string for separating
                                    #   array and row element values (";" by default)
-    .escape_character("\\")        # optional: escape character for escaping values (disabled by default)
+    .escape_character('\\')        # optional: escape character for escaping values (disabled by default)
     .null_literal("n/a")           # optional: null literal string that is interpreted as a
                                    #   null value (disabled by default)
 )
@@ -1019,6 +1182,38 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.derive-schema' = 'true',        -- or use the table's schema
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for enclosing field values ('"' by default)
+  'format.allow-comments' = true,         -- optional: ignores comment lines that start with "#"
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows with parse errors instead of failing;
+                                          -- fields are set to null in case of errors
+  'format.array-element-delimiter' = '|', -- optional: the array element delimiter string for separating
+                                          -- array and row element values (";" by default)
+  'format.escape-character' = '\\',       -- optional: escape character for escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1171,6 +1366,38 @@ format:
   derive-schema: true
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- required: define the schema either by using a type string which parses numbers to corresponding types
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.json-schema' =                    -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+    '{
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }',
+
+  'format.derive-schema' = 'true'          -- or use the table's schema
+)
+{% endhighlight %}
+</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1318,6 +1545,27 @@ format:
     }
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping:
@@ -1410,6 +1658,28 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead of failing by default
 {% endhighlight %}
 </div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- required: define the schema either by using type information
+  'format.fields.0.type' = 'FLOAT',
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.type' = 'TIMESTAMP',
+
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" by default
+  'format.quote-character' = '"',         -- optional: single character for string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional dependencies.
@@ -1532,7 +1802,6 @@ table.insertInto("csvOutputTable")
 {% endhighlight %}
 </div>
 
-
 <div data-lang="python" markdown="1">
 {% highlight python %}
 


[flink] 01/02: [FLINK-13359][docs] Add documentation for DDL introduction

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00b6e8bd3ae3943c24e0538debcae82df35dac4d
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Tue Aug 6 12:53:35 2019 +0800

    [FLINK-13359][docs] Add documentation for DDL introduction
    
    This closes #9366
---
 docs/dev/table/sql.md    | 163 ++++++++++++++++++++++++++++++++++++-------
 docs/dev/table/sql.zh.md | 177 ++++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 290 insertions(+), 50 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index e607716..79f0b41 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -22,19 +22,20 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+This is a complete list of Data Definition Language (DDL) and Data Manipulation Language (DML) constructs supported in Flink.
+* This will be replaced by the TOC
+{:toc} 
+
+## Query
 SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...]
 
-In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-external-catalog) to specify the location of th [...]
+In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [CREATE TABLE statement](#create-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-extern [...]
 
 For convenience `Table.toString()` automatically registers the table under a unique name in its `TableEnvironment` and returns the name. Hence, `Table` objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.
 
 **Note:** Flink's SQL support is not yet feature complete. Queries that include unsupported SQL features cause a `TableException`. The supported features of SQL on batch and streaming tables are listed in the following sections.
 
-* This will be replaced by the TOC
-{:toc}
-
-Specifying a Query
-------------------
+### Specifying a Query
 
 The following examples show how to specify a SQL queries on registered and inlined tables.
 
@@ -130,8 +131,7 @@ table_env \
 
 {% top %}
 
-Supported Syntax
-----------------
+### Supported Syntax
 
 Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
 
@@ -276,10 +276,9 @@ String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`)
 
 {% top %}
 
-Operations
---------------------
+### Operations
 
-### Show and Use
+#### Show and Use
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -330,7 +329,7 @@ USE mydatabase;
 </table>
 </div>
 
-### Scan, Projection, and Filter
+#### Scan, Projection, and Filter
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -385,7 +384,7 @@ SELECT PRETTY_PRINT(user) FROM Orders
 
 {% top %}
 
-### Aggregations
+#### Aggregations
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -509,7 +508,7 @@ GROUP BY users
 
 {% top %}
 
-### Joins
+#### Joins
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -655,7 +654,7 @@ WHERE
 
 {% top %}
 
-### Set Operations
+#### Set Operations
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -765,7 +764,7 @@ WHERE product EXISTS (
 
 {% top %}
 
-### OrderBy & Limit
+#### OrderBy & Limit
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -813,7 +812,7 @@ LIMIT 3
 
 {% top %}
 
-### Insert
+#### Insert
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -846,7 +845,7 @@ FROM Orders
 
 {% top %}
 
-### Group Windows
+#### Group Windows
 
 Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
 
@@ -874,13 +873,13 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
   </tbody>
 </table>
 
-#### Time Attributes
+##### Time Attributes
 
 For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming/time_attributes.html) to learn how to define time attributes.
 
 For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
 
-#### Selecting Group Window Start and End Timestamps
+##### Selecting Group Window Start and End Timestamps
 
 The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:
 
@@ -1019,7 +1018,7 @@ val result4 = tableEnv.sqlQuery(
 
 {% top %}
 
-### Pattern Recognition
+#### Pattern Recognition
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -1065,8 +1064,119 @@ MATCH_RECOGNIZE (
 
 {% top %}
 
-Data Types
-----------
+## DDL
+
+DDLs are specified with the `sqlUpdate()` method of the `TableEnvironment`. The method returns nothing for a success table creation. A `Table` can be register into the [Catalog](catalogs.html) with a `CREATE TABLE` statement, then can be referenced in SQL queries in method `sqlQuery()` of `TableEnvironment`.
+
+**Note:** Flink's DDL support is not yet feature complete. Queries that include unsupported SQL features cause a `TableException`. The supported features of SQL DDL on batch and streaming tables are listed in the following sections.
+
+### Specifying a DDL
+
+The following examples show how to specify a SQL DDL.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+Table result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+val result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL update with a registered table
+# register a TableSink
+table_env.sql_update("CREATE TABLE RubberOrders(product VARCHAR, amount INT) with (...)")
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Create Table
+
+{% highlight sql %}
+CREATE TABLE [catalog_name.][db_name.]table_name
+  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
+  [COMMENT table_comment]
+  [PARTITIONED BY (col_name1, col_name2, ...)]
+  WITH (key1=val1, key2=val2, ...)
+{% endhighlight %}
+
+Create a table with the given table properties. If a table with the same name already exists in the database, an exception is thrown.
+
+**PARTITIONED BY**
+
+Partition the created table by the specified columns. A directory is created for each partition if this table is used as a filesystem sink.
+
+**WITH OPTIONS**
+
+Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector.
+
+The key and value of expression `key1=val1` should both be string literal. See details in [Connect to External Systems](connect.html) for all the supported table properties of different connectors.
+
+**Notes:** The table name can be of three formats: 1. `catalog_name.db_name.table_name` 2. `db_name.table_name` 3. `table_name`. For `catalog_name.db_name.table_name`, the table would be registered into metastore with catalog named "catalog_name" and database named "db_name"; for `db_name.table_name`, the table would be registered into the current catalog of the execution table environment and database named "db_name"; for `table_name`, the table would be registered into the current cata [...]
+
+**Notes:** The table registered with `CREATE TABLE` statement can be used as both table source and table sink, we can not decide if it is used as a source or sink until it is referenced in the DMLs.
+
+{% top %}
+
+### Drop Table
+
+{% highlight sql %}
+DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
+{% endhighlight %}
+
+Drop a table with the given table name. If the table to drop does not exist, an exception is thrown.
+
+**IF EXISTS**
+
+If the table does not exist, nothing happens.
+
+{% top %}
+
+## Data Types
 
 Please see the dedicated page about [data types](types.html).
 
@@ -1076,10 +1186,13 @@ Fields of composite types with arbitrary nesting can be accessed with [value acc
 
 Generic types are treated as a black box and can be passed on or processed by [user-defined functions](udfs.html).
 
+For DDLs, we support full data types defined in page [Data Types]({{ site.baseurl }}/dev/table/types.html).
+
+**Notes:** Some of the data types are not supported in the sql query(the cast expression or literals). E.G. `STRING`, `BYTES`, `TIME(p) WITHOUT TIME ZONE`, `TIME(p) WITH LOCAL TIME ZONE`, `TIMESTAMP(p) WITHOUT TIME ZONE`, `TIMESTAMP(p) WITH LOCAL TIME ZONE`, `ARRAY`, `MULTISET`, `ROW`.
+
 {% top %}
 
-Reserved Keywords
------------------
+## Reserved Keywords
 
 Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. `` `value` ``, `` `count` ``).
 
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index 4efddfb..fa3b050 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -22,19 +22,20 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+This is a complete list of Data Definition Language (DDL) and Data Manipulation Language (DML) constructs supported in Flink.
+* This will be replaced by the TOC
+{:toc} 
+
+## Query
 SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...]
 
-In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-external-catalog) to specify the location of th [...]
+In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [CREATE TABLE statement](#create-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-extern [...]
 
 For convenience `Table.toString()` automatically registers the table under a unique name in its `TableEnvironment` and returns the name. Hence, `Table` objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.
 
 **Note:** Flink's SQL support is not yet feature complete. Queries that include unsupported SQL features cause a `TableException`. The supported features of SQL on batch and streaming tables are listed in the following sections.
 
-* This will be replaced by the TOC
-{:toc}
-
-Specifying a Query
-------------------
+### Specifying a Query
 
 The following examples show how to specify a SQL queries on registered and inlined tables.
 
@@ -130,8 +131,7 @@ table_env \
 
 {% top %}
 
-Supported Syntax
-----------------
+### Supported Syntax
 
 Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
 
@@ -276,10 +276,9 @@ String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`)
 
 {% top %}
 
-Operations
---------------------
+### Operations
 
-### Show and Use
+#### Show and Use
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -385,7 +384,7 @@ SELECT PRETTY_PRINT(user) FROM Orders
 
 {% top %}
 
-### Aggregations
+#### Aggregations
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -509,7 +508,7 @@ GROUP BY users
 
 {% top %}
 
-### Joins
+#### Joins
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -655,7 +654,7 @@ WHERE
 
 {% top %}
 
-### Set Operations
+#### Set Operations
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -765,7 +764,7 @@ WHERE product EXISTS (
 
 {% top %}
 
-### OrderBy & Limit
+#### OrderBy & Limit
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -797,7 +796,7 @@ ORDER BY orderTime
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-<b>Note:</b> The LIMIT clause requires an ORDER BY clause.
+<b>Note:</b> The LIMIT clause requires an ORDER BY clause. 
 {% highlight sql %}
 SELECT *
 FROM Orders
@@ -813,7 +812,7 @@ LIMIT 3
 
 {% top %}
 
-### Insert
+#### Insert
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -846,7 +845,7 @@ FROM Orders
 
 {% top %}
 
-### Group Windows
+#### Group Windows
 
 Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
 
@@ -874,13 +873,13 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
   </tbody>
 </table>
 
-#### Time Attributes
+##### Time Attributes
 
 For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming/time_attributes.html) to learn how to define time attributes.
 
 For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
 
-#### Selecting Group Window Start and End Timestamps
+##### Selecting Group Window Start and End Timestamps
 
 The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:
 
@@ -1019,7 +1018,7 @@ val result4 = tableEnv.sqlQuery(
 
 {% top %}
 
-### Pattern Recognition
+#### Pattern Recognition
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -1065,8 +1064,133 @@ MATCH_RECOGNIZE (
 
 {% top %}
 
-Data Types
-----------
+### Drop Table
+
+{% highlight sql %}
+DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
+{% endhighlight %}
+
+Drop a table with the given table name. If the table to drop does not exist, an exception is thrown.
+
+**IF EXISTS**
+
+If the table does not exist, nothing happens.
+
+{% top %}
+
+## DDL
+
+DDLs are specified with the `sqlUpdate()` method of the `TableEnvironment`. The method returns nothing for a success table creation. A `Table` can be register into the [Catalog](catalog.html) with a `CREATE TABLE` statement, then be referenced in the SQL queries in method `sqlQuery()` of `TableEnvironment`.
+
+**Note:** Flink's DDL support is not yet feature complete. Queries that include unsupported SQL features cause a `TableException`. The supported features of SQL DDL on batch and streaming tables are listed in the following sections.
+
+### Specifying a DDL
+
+The following examples show how to specify a SQL DDL.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+Table result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+val result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL update with a registered table
+# register a TableSink
+table_env.sql_update("CREATE TABLE RubberOrders(product VARCHAR, amount INT) with (...)")
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Create Table
+
+{% highlight sql %}
+CREATE TABLE [catalog_name.][db_name.]table_name
+  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
+  [COMMENT table_comment]
+  [PARTITIONED BY (col_name1, col_name2, ...)]
+  WITH (key1=val1, key2=val2, ...)
+{% endhighlight %}
+
+Create a table with the given table properties. If a table with the same name already exists in the database, an exception is thrown.
+
+**PARTITIONED BY**
+
+Partition the created table by the specified columns. A directory is created for each partition if this table is used as a filesystem sink.
+
+**WITH OPTIONS**
+
+Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector.
+
+The key and value of expression `key1=val1` should both be string literal. See details in [Connect to External Systems](connect.html) for all the supported table properties of different connectors.
+
+**Notes:** The table name can be of three formats: 1. `catalog_name.db_name.table_name` 2. `db_name.table_name` 3. `table_name`. For `catalog_name.db_name.table_name`, the table would be registered into metastore with catalog named "catalog_name" and database named "db_name"; for `db_name.table_name`, the table would be registered into the current catalog of the execution table environment and database named "db_name"; for `table_name`, the table would be registered into the current cata [...]
+
+**Notes:** The table registered with `CREATE TABLE` statement can be used as both table source and table sink, we can not decide if it is used as a source or sink until it is referenced in the DMLs.
+
+{% top %}
+
+### Drop Table
+
+{% highlight sql %}
+DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
+{% endhighlight %}
+
+Drop a table with the given table name. If the table to drop does not exist, an exception is thrown.
+
+**IF EXISTS**
+
+If the table does not exist, nothing happens.
+
+{% top %}
+
+## Data Types
 
 Please see the dedicated page about [data types](types.html).
 
@@ -1076,10 +1200,13 @@ Fields of composite types with arbitrary nesting can be accessed with [value acc
 
 Generic types are treated as a black box and can be passed on or processed by [user-defined functions](udfs.html).
 
+For DDLs, we support full data types defined in page [Data Types]({{ site.baseurl }}/dev/table/types.html).
+
+**Notes:** Some of the data types are not supported in the sql query(the cast expression or literals). E.G. `STRING`, `BYTES`, `TIME(p) WITHOUT TIME ZONE`, `TIME(p) WITH LOCAL TIME ZONE`, `TIMESTAMP(p) WITHOUT TIME ZONE`, `TIMESTAMP(p) WITH LOCAL TIME ZONE`, `ARRAY`, `MULTISET`, `ROW`.
+
 {% top %}
 
-Reserved Keywords
------------------
+## Reserved Keywords
 
 Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. `` `value` ``, `` `count` ``).