You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/27 07:06:40 UTC

[GitHub] [flink] rmetzger commented on a change in pull request #13770: [FLINK-18858][connector-kinesis] Add Kinesis sources and sinks

rmetzger commented on a change in pull request #13770:
URL: https://github.com/apache/flink/pull/13770#discussion_r512456345



##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -0,0 +1,270 @@
+---
+title: "Amazon Kinesis Data Streams SQL Connector"
+nav-title: Kinesis
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.region</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined. Either this or <code>properties.aws.endpoint</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>properties.aws.region</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.initpos</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.recordpublisher</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The `RecordPublisher` type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.*</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>
+      Other properties to pass to the FlinkKinesisConsumer or FlinkKinesisProducer constructors. 
+      See the constants defined in 
+      <ul>
+        <li><code>AWSConfigConstants</code>,</li> 
+        <li><code>ConsumerConfigConstants</code>, and</li> 
+        <li><code>ProducerConfigConstants</code></li>
+      </ul>
+      for detailed information on the available options.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Authorization
+
+Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams.
+
+### Authentication
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider.  
+
+A specific Credentials Provider can be **optionally** set using the `properties.aws.credentials.provider` setting.
+Supported values are:
+
+* `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+* `BASIC` - Use access key ID and secret key supplied as configuration. 
+* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`.
+* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. 
+
+### Start Reading Position
+
+You can configure the `FlinkKinesisConsumer` to start reading the table-backing Kinesis data stream from a specific position through the `properties.flink.stream.initpos` option.
+Available values are:
+
+* `LATEST`: read all shards starting from the latest record.
+* `TRIM_HORIZON`: read all shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream).
+* `AT_TIMESTAMP`: read all shards starting from a specified timestamp. The timestamp must also be specified in the configuration
+properties by providing a value for `properties.flink.stream.initpos.timestamp`, in one of the following date patterns:
+   * A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
+   * A user defined pattern, which is a valid pattern for the `SimpleDateFormat` provided by `properties.flink.stream.initpos.timestamp.format`.
+     If a user-defined format is not supplied, the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.
+     For example, timestamp value is `2016-04-04` and user-defined foramt is `yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a user-defined format is not provided.
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backe table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey`</code>` values are assigned randomly.

Review comment:
       ```suggestion
   * `random`: Kinesis `PartitionKey` values are assigned randomly.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org