You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/01 13:24:00 UTC

[jira] [Commented] (FLINK-9947) Document unified table sources/sinks/formats

    [ https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565310#comment-16565310 ] 

ASF GitHub Bot commented on FLINK-9947:
---------------------------------------

twalthr commented on a change in pull request #6456: [FLINK-9947] [docs] Document unified table sources/sinks/formats
URL: https://github.com/apache/flink/pull/6456#discussion_r206876401
 
 

 ##########
 File path: docs/dev/table/connect.md
 ##########
 @@ -0,0 +1,1033 @@
+---
+title: "Connect to External Systems"
+nav-parent_id: tableapi
+nav-pos: 19
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source, sink, or both have been registered, they can be accessed by Table API & SQL queries.
+
+<span class="label label-danger">Attention</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Dependencies
+------------
+
+The following table list all available connectors and formats. Their mutual compatibility is tagged in the corresponding sections for [table connectors](connect.html#table-connectors) and [table formats](connect.html#table-formats). The following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+### Connectors
+
+| Name              | Version       | Maven dependency             | SQL Client JAR         |
+| :---------------- | :------------ | :--------------------------- | :----------------------|
+| Filesystem        |               | Built-in                     | Built-in               |
+| Apache Kafka      | 0.8           | `flink-connector-kafka-0.8`  | Not available          |
+| Apache Kafka      | 0.9           | `flink-connector-kafka-0.9`  | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
+| Apache Kafka      | 0.10          | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
+| Apache Kafka      | 0.11          | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
+
+### Formats
+
+| Name              | Maven dependency             | SQL Client JAR         |
+| :---------------- | :--------------------------- | :--------------------- |
+| CSV               | Built-in                     | Built-in               |
+| JSON              | `flink-json`                 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
+| Apache Avro       | `flink-avro`                 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
+
+{% else %}
+
+This table is only available for stable releases.
+
+{% endif %}
+
+{% top %}
+
+Overview
+--------
+
+Beginning from Flink 1.6, the declaration of a connection to an external system is separated from the actual implementation. Connections can be specified either
+
+- **programmatically** using a `Descriptor` under `org.apache.flink.table.descriptors` for Table & SQL API
+- or **declaratively** via [YAML configuration files](http://yaml.org/) for the SQL Client.
+
+This allows not only for better unification of APIs and SQL Client but also for better extensibility in case of [custom implementations](sourceSinks.html) without changing the declaration.
+
+Similar to a SQL `CREATE TABLE` statement, one can define the name of the table, the final schema of the table, connector, and a data format upfront for connecting to an external system. Additionally, the table's type (source, sink, or both) and an update mode for streaming queries can be specified:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+tableEnvironment
+  .connect(...)
+  .withFormat(...)
+  .withSchema(...)
+  .inAppendMode()
+  .registerTableSource(...)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+name: MyTable
+type: source
+update-mode: append
+schema: ...
+format: ...
+connector: ...
+{% endhighlight %}
+</div>
+</div>
+
+The subsequent sections will cover each definition part ([schema](connect.html#table-schema), [connector](connect.html#table-connectors), [format](connect.html#table-formats), and [update mode](connect.html#update-modes)) in more detail.
+
+The following code shows a full example of how to connect to Kafka for reading Avro records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+tableEnvironment
+  // declare the external system to connect to
+  .connect(
+    new Kafka()
+      .version("0.10")
+      .topic("test-input")
+      .startFromEarliest()
+      .property("zookeeper.connect", "localhost:2181")
+      .property("bootstrap.servers", "localhost:9092")
+  )
+
+  // declare a format for this system
+  .withFormat(
+    new Avro()
+      .avroSchema(
+        "{" +
+        "  \"namespace\": \"org.myorganization\"," +
+        "  \"type\": \"record\"," +
+        "  \"name\": \"UserMessage\"," +
+        "    \"fields\": [" +
+        "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
+        "      {\"name\": \"user\", \"type\": \"long\"}," +
+        "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
+        "    ]" +
+        "}" +
+      )
+  )
+
+  // declare the final schema of the table
+  .withSchema(
+    new Schema()
+      .field("rowtime", Types.SQL_TIMESTAMP)
+        .rowtime(new Rowtime()
+          .timestampsFromField("ts")
+          .watermarksPeriodicBounded(60000)
+        )
+      .field("user", Types.LONG)
+      .field("message", Types.STRING)
+  )
+
+  // specify the update-mode for streaming tables
+  .inAppendMode()
+
+  // register as source, sink, or both and under a name
+  .registerTableSource("MyUserTable");
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+tables:
+  - name: MyUserTable      # name the new table
+    type: source           # declare if the table should be "source", "sink", or "both"
+    update-mode: append    # specify the update-mode for streaming tables
+
+    # declare the final schema of the table
+    schema:
+      - name: rowtime
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-field
+            from: ts
+          watermarks:
+            type: periodic-bounded
+            delay: "60000"
+      - name: user
+        type: BIGINT
+      - name: message
+        type: VARCHAR
+
+    # declare a format for this system
+    format:
+      type: avro
+      avro-schema: >
+        {
+          "namespace": "org.myorganization",
+          "type": "record",
+          "name": "UserMessage",
+            "fields": [
+              {"name": "ts", "type": "string"},
+              {"name": "user", "type": "long"},
+              {"name": "message", "type": ["string", "null"]}
+            ]
+        }
+
+    # declare the external system to connect to
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-input
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+{% endhighlight %}
+</div>
+</div>
+
+In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into when searching for exactly-one matching table factory.
 
 Review comment:
   Yes, they are important when a factory cannot be resolved. The exception will contain the provided properties. I wanted to quickly mention the underlying mechanism in the overview.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Document unified table sources/sinks/formats
> --------------------------------------------
>
>                 Key: FLINK-9947
>                 URL: https://issues.apache.org/jira/browse/FLINK-9947
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation, Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> The recent unification of table sources/sinks/formats needs documentation. I propose a new page that explains the built-in sources, sinks, and formats as well as a page for customization of public interfaces.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)