You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/26 14:00:24 UTC
[flink-statefun] 06/13: [FLINK-16758][docs] Port I/O Module content
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 200362bd2c132eb7fbb4febc2b31daa9df33dbb0
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Mar 24 17:07:48 2020 -0500
[FLINK-16758][docs] Port I/O Module content
---
docs/io-module/apache-kafka.md | 421 +++++++++++++++++++++++++++++++++
docs/io-module/aws-kinesis.md | 465 +++++++++++++++++++++++++++++++++++++
docs/io-module/flink-connectors.md | 95 ++++++++
docs/io-module/index.md | 243 +++++++++++++++++++
4 files changed, 1224 insertions(+)
diff --git a/docs/io-module/apache-kafka.md b/docs/io-module/apache-kafka.md
new file mode 100644
index 0000000..762fd27
--- /dev/null
+++ b/docs/io-module/apache-kafka.md
@@ -0,0 +1,421 @@
+---
+title: Apache Kafka
+nav-id: apache-kafka
+nav-pos: 1
+nav-title: Apache Kafka
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics.
+It is based on Apache Flink's universal [Kafka connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html) and provides exactly-once processing semantics.
+The Kafka I/O Module is configurable in Yaml or Java.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kafka I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-kafka-io</artifactId>
+ <version>{{ site.version }}</version>
+ <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Kafka Ingress Spec
+
+A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster.
+
+It accepts the following arguments:
+
+1. The ingress identifier associated with this ingress
+2. The topic name / list of topic names
+3. The address of the bootstrap servers
+4. The consumer group id to use
+5. A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only)
+6. The position to start consuming from
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+
+public class IngressSpecs {
+
+ public static final IngressIdentifier<User> ID =
+ new IngressIdentifier<>(User.class, "example", "input-ingress");
+
+ public static final IngressSpec<User> kafkaIngress =
+ KafkaIngressBuilder.forIdentifier(ID)
+ .withKafkaAddress("localhost:9092")
+ .withConsumerGroupId("greetings")
+ .withTopic("my-topic")
+ .withDeserializer(UserDeserializer.class)
+ .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
+ .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+spec:
+ ingresses:
+ - ingress:
+ meta:
+ type: statefun.kafka.io/routable-protobuf-ingress
+ id: example/user-ingress
+ spec:
+ address: kafka-broker:9092
+ consumerGroupId: routable-kafka-e2e
+ startupPosition:
+ type: earliest
+ topics:
+ - topic: messages-1
+ typeUrl: org.apache.flink.statefun.docs.models.User
+ targets:
+ - example-namespace/my-function-1
+ - example-namespace/my-function-2
+{% endhighlight %}
+</div>
+</div>
+
+The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``.
+Please refer to the Kafka [consumer configuration](https://docs.confluent.io/current/installation/configuration/consumer-configs.html) documentation for the full list of available properties.
+Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties.
+
+### Startup Position
+
+The ingress allows configuring the startup position to be one of the following:
+
+#### From Group Offset (default)
+
+Starts from offsets that were committed to Kafka for the specified consumer group.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromGroupOffsets();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: group-offsets
+{% endhighlight %}
+</div>
+</div>
+
+#### Earlist
+
+Starts from the earliest offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromEarliest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: earliest
+{% endhighlight %}
+</div>
+</div>
+
+#### Latest
+
+Starts from the latest offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromLatest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: latest
+{% endhighlight %}
+</div>
+</div>
+
+#### Specific Offsets
+
+Starts from specific offsets, defined as a map of partitions to their target starting offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<TopicPartition, Long> offsets = new HashMap<>();
+offsets.add(new TopicPartition("user-topic", 0), 91);
+offsets.add(new TopicPartition("user-topic", 11), 11);
+offsets.add(new TopicPartition("user-topic", 8), 8);
+
+KafkaIngressStartupPosition#fromSpecificOffsets(offsets);
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: specific-offsets
+ offsets:
+ - user-topic/0: 91
+ - user-topic/1: 11
+ - user-topic/2: 8
+{% endhighlight %}
+</div>
+</div>
+
+#### Date
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: date
+ date: 2020-02-01 04:15:00.00 Z
+{% endhighlight %}
+</div>
+</div>
+
+On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``.
+By default, this is set to be the latest position.
+
+### Kafka Deserializer
+
+When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
+The ``KafkaIngressDeserializer`` allows users to specify such a schema.
+The ``T deserialize(ConsumerRecord<byte[], byte[]> record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserDeserializer implements KafkaIngressDeserializer<User> {
+
+ private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public User deserialize(ConsumerRecord<byte[], byte[]> input) {
+ try {
+ return mapper.readValue(input.value(), User.class);
+ } catch (IOException e) {
+ LOG.debug("Failed to deserialize record", e);
+ return null;
+ }
+ }
+}
+{% endhighlight %}
+
+## Kafka Egress Spec
+
+A ``KafkaEgressBuilder`` declares an egress spec for writing data out to a Kafka cluster.
+
+It accepts the following arguments:
+
+1. The egress identifier associated with this egress
+2. The address of the bootstrap servers
+3. A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only)
+4. The fault tolerance semantic
+5. Properties for the Kafka producer
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+
+public class EgressSpecs {
+
+ public static final EgressIdentifier<User> ID =
+ new EgressIdentifier<>("example", "output-egress", User.class);
+
+ public static final EgressSpec<User> kafkaEgress =
+ KafkaEgressBuilder.forIdentifier(ID)
+ .withKafkaAddress("localhost:9092")
+ .withSerializer(UserSerializer.class)
+ .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+spec:
+ egresses:
+ - egress:
+ meta:
+ type: statefun.kafka.io/generic-egress
+ id: example/output-messages
+ spec:
+ address: kafka-broker:9092
+ deliverySemantic:
+ type: exactly-once
+ transactionTimeoutMillis: 100000
+ properties:
+ - foo.config: bar
+{% endhighlight %}
+</div>
+</div>
+
+Please refer to the Kafka [producer configuration](https://docs.confluent.io/current/installation/configuration/producer-configs.html) documentation for the full list of available properties.
+
+### Kafka Egress and Fault Tolerance
+
+With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees.
+You can choose three different modes of operation.
+
+#### None
+
+Nothing is guaranteed, produced records can be lost or duplicated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withNoProducerSemantics();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+ type: none
+{% endhighlight %}
+</div>
+</div>
+
+#### At Least Once
+
+Stateful Functions will guarantee that no records will be lost but they can be duplicated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withAtLeastOnceProducerSemantics();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+ type: at-least-once
+{% endhighlight %}
+</div>
+</div>
+
+#### Exactly Once
+
+Stateful Functions uses Kafka transactions to provide exactly-once semantics.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+ type: exactly-once
+ transactionTimeoutMillis: 900000 # 15 min
+{% endhighlight %}
+</div>
+</div>
+
+### Kafka Serializer
+
+When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data.
+The ``KafkaEgressSerializer`` allows users to specify such a schema.
+The ``ProducerRecord<byte[], byte[]> serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserSerializer implements KafkaEgressSerializer<User> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
+
+ private static final String TOPIC = "user-topic";
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(User user) {
+ try {
+ byte[] key = user.getUserId().getBytes();
+ byte[] value = mapper.writeValueAsBytes(user);
+
+ return new ProducerRecord<>(TOPIC, key, value);
+ } catch (JsonProcessingException e) {
+ LOG.info("Failed to serializer user", e);
+ return null;
+ }
+ }
+}
+{% endhighlight %}
\ No newline at end of file
diff --git a/docs/io-module/aws-kinesis.md b/docs/io-module/aws-kinesis.md
new file mode 100644
index 0000000..65e7d51
--- /dev/null
+++ b/docs/io-module/aws-kinesis.md
@@ -0,0 +1,465 @@
+---
+title: AWS Kinesis
+nav-id: aws-kinesis
+nav-pos: 2
+nav-title: AWS Kinesis
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams.
+It is based on Apache Flink's [Kinesis connector](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kinesis.html).
+The Kinesis I/O Module is configurable in Yaml or Java.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kinesis I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-kinesis-io</artifactId>
+ <version>{{ site.version }}</version>
+ <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Kinesis Ingress Spec
+
+A ``KinesisIngressSpec`` declares an ingress spec for consuming from Kinesis stream.
+
+It accepts the following arguments:
+
+1. The AWS region
+2. An AWS credentials provider
+3. A ``KinesisIngressDeserializer`` for deserializing data from Kinesis (Java only)
+4. The stream start position
+5. Properties for the Kinesis client
+6. The name of the stream to consume from
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+public class IngressSpecs {
+
+ public static final IngressIdentifier<User> ID =
+ new IngressIdentifier<>(User.class, "example", "input-ingress");
+
+ public static final IngressSpec<User> kinesisIngress =
+ KinesisIngressBuilder.forIdentifier(ID)
+ .withAwsRegion("us-west-1")
+ .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+ .withDeserializer(UserDeserializer.class)
+ .withStream("stream-name")
+ .withStartupPosition(KinesisIngressStartupPosition.fromEarliest())
+ .withClientConfigurationProperty("key", "value")
+ .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+ spec:
+ ingresses:
+ - ingress:
+ meta:
+ type: statefun.kinesis.io/routable-protobuf-ingress
+ id: example-namespace/messages
+ spec:
+ awsRegion:
+ type: specific
+ id: us-west-1
+ awsCredentials:
+ type: basic
+ accessKeyId: my_access_key_id
+ secretAccessKey: my_secret_access_key
+ startupPosition:
+ type: earliest
+ streams:
+ - stream: stream-1
+ typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User
+ targets:
+ - example-namespace/my-function-1
+ - example-namespace/my-function-2
+ - stream: stream-2
+ typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User
+ targets:
+ - example-namespace/my-function-1
+ clientConfigProperties:
+ - SocketTimeout: 9999
+ - MaxConnections: 15
+{% endhighlight %}
+</div>
+</div>
+
+The ingress also accepts properties to directly configure the Kinesis client, using ``KinesisIngressBuilder#withClientConfigurationProperty()``.
+Please refer to the Kinesis [client configuration](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) documentation for the full list of available properties.
+Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties.
+
+### Startup Position
+
+The ingress allows configuring the startup position to be one of the following:
+
+#### Latest (default)
+
+Start consuming from the latest position, i.e. head of the stream shards.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromLatest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: latest
+{% endhighlight %}
+</div>
+</div>
+
+#### Earlist
+
+Start consuming from the earliest position possible.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromEarliest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: earliest
+{% endhighlight %}
+</div>
+</div>
+
+#### Date
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromDate(ZonedDateTime.now());
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+ type: date
+ date: 2020-02-01 04:15:00.00 Z
+{% endhighlight %}
+</div>
+</div>
+
+### Kinesis Deserializer
+
+The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects.
+The ``KinesisIngressDeserializer`` allows users to specify such a schema.
+The ``T deserialize(IngressRecord ingressRecord)`` method gets called for each Kinesis record, passing the binary data and metadata from Kinesis.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserDeserializer implements KinesisIngressDeserializer<User> {
+
+ private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public User deserialize(IngressRecord ingressRecord) {
+ try {
+ return mapper.readValue(ingressRecord.getData(), User.class);
+ } catch (IOException e) {
+ LOG.debug("Failed to deserialize record", e);
+ return null;
+ }
+ }
+}
+{% endhighlight %}
+
+## Kinesis Egress Spec
+
+A ``KinesisEgressBuilder`` declares an egress spec for writing data out to a Kinesis stream.
+
+It accepts the following arguments:
+
+1. The egress identifier associated with this egress
+2. The AWS credentials provider
+3. A ``KinesisEgressSerializer`` for serializing data into Kinesis (Java only)
+4. The AWS region
+5. Properties for the Kinesis client
+6. The number of max outstanding records before backpressure is applied
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+
+public class EgressSpecs {
+
+ public static final EgressIdentifier<User> ID =
+ new EgressIdentifier<>("example", "output-egress", User.class);
+
+ public static final EgressSpec<User> kinesisEgress =
+ KinesisEgressBuilder.forIdentifier(ID)
+ .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+ .withAwsRegion("us-west-1")
+ .withMaxOutstandingRecords(100)
+ .withClientConfigurationProperty("key", "value")
+ .withSerializer(UserSerializer.class)
+ .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+ spec:
+ egresses:
+ - egress:
+ meta:
+ type: statefun.kinesis.io/generic-egress
+ id: example/output-messages
+ spec:
+ awsRegion:
+ type: custom-endpoint
+ endpoint: https://localhost:4567
+ id: us-west-1
+ awsCredentials:
+ type: profile
+ profileName: john-doe
+ profilePath: /path/to/profile/config
+ maxOutstandingRecords: 9999
+ clientConfigProperties:
+ - ThreadingModel: POOLED
+ - ThreadPoolSize: 10
+{% endhighlight %}
+</div>
+</div>
+
+Please refer to the Kinesis [producer default configuration properties](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties) documentation for the full list of available properties.
+
+### Kinesis Serializer
+
+The Kinesis egress needs to know how to turn Java objects into binary data.
+The ``KinesisEgressSerializer`` allows users to specify such a schema.
+The ``EgressRecord serialize(T value)`` method gets called for each message, allowing users to set a value, and other metadata.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserSerializer implements KinesisEgressSerializer<User> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
+
+ private static final String STREAM = "user-stream";
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public EgressRecord serialize(User value) {
+ try {
+ return EgressRecord.newBuilder()
+ .withPartitionKey(value.getUserId())
+ .withData(mapper.writeValueAsBytes(value))
+ .withStream(STREAM)
+ .build();
+ } catch (IOException e) {
+ LOG.info("Failed to serializer user", e);
+ return null;
+ }
+ }
+}
+{% endhighlight %}
+
+## AWS Region
+
+Both the Kinesis ingress and egress can be configured to a specific AWS region.
+
+#### Default Provider Chain (default)
+
+Consults AWS's default provider chain to determine the AWS region.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.fromDefaultProviderChain();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: default
+{% endhighlight %}
+</div>
+</div>
+
+#### Specific
+
+Specifies an AWS region using the region's unique id.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.of("us-west-1");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: specific
+ id: us-west-1
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Custom Endpoint
+
+Connects to an AWS region through a non-standard AWS service endpoint.
+This is typically used only for development and testing purposes.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.ofCustomEndpoint("https://localhost:4567", "us-west-1");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: custom-endpoint
+ endpoint: https://localhost:4567
+ id: us-west-1
+{% endhighlight %}
+</div>
+</div>
+
+## AWS Credentials
+
+Both the Kinesis ingress and egress can be configured using standard AWS credential providers.
+
+#### Default Provider Chain (default)
+
+Consults AWS’s default provider chain to determine the AWS credentials.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.fromDefaultProviderChain();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: default
+{% endhighlight %}
+</div>
+</div>
+
+#### Basic
+
+Specifies the AWS credentials directly with provided access key ID and secret access key strings.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.basic("accessKeyId", "secretAccessKey");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: basic
+ accessKeyId: access-key-id
+ secretAccessKey: secret-access-key
+{% endhighlight %}
+</div>
+</div>
+
+#### Profile
+
+Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.profile("profile-name", "/path/to/profile/config");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+ type: basic
+ profileName: profile-name
+ profilePath: /path/to/profile/config
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/docs/io-module/flink-connectors.md b/docs/io-module/flink-connectors.md
new file mode 100644
index 0000000..d25e539
--- /dev/null
+++ b/docs/io-module/flink-connectors.md
@@ -0,0 +1,95 @@
+---
+title: Flink Connectors
+nav-id: flink-connectors
+nav-pos: 3
+nav-title: Flink Connectors
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+The source-sink I/O module allows you to plug in existing, or custom, Flink connectors that are not already integrated into a dedicated I/O module.
+For details details of how to build a custom connector see the official [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-stable).
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kafka I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-io</artifactId>
+ <version>{{ site.version }}</version>
+ <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Source Spec
+
+A source function spec creates an ingress from a Flink source function.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.flink;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithSourceSpec implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ IngressIdentifier<User> id = new IngressIdentifier<>(User.class, "example", "users");
+ IngressSpec<User> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
+ binder.bindIngress(spec);
+ }
+}
+{% endhighlight %}
+
+## Sink Spec
+
+A sink function spec creates an egress from a Flink sink function.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.flink;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithSinkSpec implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ EgressIdentifier<User> id = new EgressIdentifier<>("example", "user", User.class);
+ EgressSpec<User> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
+ binder.bindEgress(spec);
+ }
+}
+{% endhighlight %}
\ No newline at end of file
diff --git a/docs/io-module/index.md b/docs/io-module/index.md
new file mode 100644
index 0000000..3ae0905
--- /dev/null
+++ b/docs/io-module/index.md
@@ -0,0 +1,243 @@
+---
+title: I/O Module
+nav-id: io-module
+nav-pos: 4
+nav-title: 'I/O Module'
+nav-parent_id: root
+nav-show_overview: true
+permalink: /io-module/index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Stateful Functions' I/O modules allow functions to receive and send messages to external systems.
+Based on the concept of Ingress (input) and Egress (output) points, and built on top of the {flink} connector ecosystem, I/O modules enable functions to interact with the outside world through the style of message passing.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Ingress
+
+An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions.
+It is defined via an ``IngressIdentifier`` and an ``IngressSpec``.
+
+An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name.
+
+The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+public final class Identifiers {
+
+ public static final IngressIdentifier<User> INGRESS =
+ new IngressIdentifier<>(User.class, "example", "user-ingress");
+}
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.io.MissingImplementationException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithIngress implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ IngressSpec<User> spec = createIngress(Identifiers.INGRESS);
+ binder.bindIngress(spec);
+ }
+
+ private IngressSpec<User> createIngress(IngressIdentifier<User> identifier) {
+ throw new MissingImplementationException("Replace with your specific ingress");
+ }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+ spec:
+ ingresses:
+ - ingress:
+ meta:
+ id: example/user-ingress
+ type: # ingress type
+ spec: # ingress specific configurations
+{% endhighlight %}
+</div>
+</div>
+
+## Router
+
+A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.
+Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import org.apache.flink.statefun.docs.FnUser;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.Router;
+
+public class UserRouter implements Router<User> {
+
+ @Override
+ public void route(User message, Downstream<User> downstream) {
+ downstream.forward(FnUser.TYPE, message.getUserId(), message);
+ }
+}
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithRouter implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ IngressSpec<User> spec = createIngressSpec(Identifiers.INGRESS);
+ Router<User> router = new UserRouter();
+
+ binder.bindIngress(spec);
+ binder.bindIngressRouter(Identifiers.INGRESS, router);
+ }
+
+ private IngressSpec<User> createIngressSpec(IngressIdentifier<User> identifier) {
+ throw new MissingImplementationException("Replace with your specific ingress");
+ }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+When defined in ``yaml``, routers are defined by a list of function types.
+The ``id`` component of the address is pulled from the key associated with each record in its underlying source implementation.
+{% highlight yaml %}
+targets:
+ - example-namespace/my-function-1
+ - example-namespace/my-function-2
+{% endhighlight %}
+</div>
+</div>
+
+## Egress
+
+Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems.
+Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``.
+
+An egress identifier uniquely identifies an egress based on a namespace, name, and producing type.
+An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module.
+Each identifier-spec pair are bound to the system inside a stateful function module.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+public final class Identifiers {
+
+ public static final EgressIdentifier<User> EGRESS =
+ new EgressIdentifier<>("example", "egress", User.class);
+}
+
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.io.MissingImplementationException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithEgress implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ EgressSpec<User> spec = createEgress(Identifiers.EGRESS);
+ binder.bindEgress(spec);
+ }
+
+ public EgressSpec<User> createEgress(EgressIdentifier<User> identifier) {
+ throw new MissingImplementationException("Replace with your specific egress");
+ }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+ meta:
+ type: remote
+ spec:
+ egresses:
+ - egress:
+ meta:
+ id: example/user-egress
+ type: # egress type
+ spec: # egress specific configurations
+{% endhighlight %}
+</div>
+</div>
+
+Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+/** A simple function that outputs messages to an egress. */
+public class FnOutputting implements StatefulFunction {
+
+ @Override
+ public void invoke(Context context, Object input) {
+ context.send(Identifiers.EGRESS, new User());
+ }
+}
+{% endhighlight %}
\ No newline at end of file