You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/26 14:00:24 UTC

[flink-statefun] 06/13: [FLINK-16758][docs] Port I/O Module content

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 200362bd2c132eb7fbb4febc2b31daa9df33dbb0
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Mar 24 17:07:48 2020 -0500

    [FLINK-16758][docs] Port I/O Module content
---
 docs/io-module/apache-kafka.md     | 421 +++++++++++++++++++++++++++++++++
 docs/io-module/aws-kinesis.md      | 465 +++++++++++++++++++++++++++++++++++++
 docs/io-module/flink-connectors.md |  95 ++++++++
 docs/io-module/index.md            | 243 +++++++++++++++++++
 4 files changed, 1224 insertions(+)

diff --git a/docs/io-module/apache-kafka.md b/docs/io-module/apache-kafka.md
new file mode 100644
index 0000000..762fd27
--- /dev/null
+++ b/docs/io-module/apache-kafka.md
@@ -0,0 +1,421 @@
+---
+title: Apache Kafka
+nav-id: apache-kafka
+nav-pos: 1
+nav-title: Apache Kafka
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics.
+It is based on Apache Flink's universal [Kafka connector](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html) and provides exactly-once processing semantics.
+The Kafka I/O Module is configurable in Yaml or Java.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kafka I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>statefun-kafka-io</artifactId>
+	<version>{{ site.version }}</version>
+	<scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Kafka Ingress Spec
+
+A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster.
+
+It accepts the following arguments:
+
+1. The ingress identifier associated with this ingress
+2. The topic name / list of topic names
+3. The address of the bootstrap servers
+4. The consumer group id to use
+5. A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only)
+6. The position to start consuming from
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+
+public class IngressSpecs {
+
+  public static final IngressIdentifier<User> ID =
+      new IngressIdentifier<>(User.class, "example", "input-ingress");
+
+  public static final IngressSpec<User> kafkaIngress =
+      KafkaIngressBuilder.forIdentifier(ID)
+          .withKafkaAddress("localhost:9092")
+          .withConsumerGroupId("greetings")
+          .withTopic("my-topic")
+          .withDeserializer(UserDeserializer.class)
+          .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
+          .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+    meta:
+    type: remote
+spec:
+    ingresses:
+    - ingress:
+        meta:
+            type: statefun.kafka.io/routable-protobuf-ingress
+            id: example/user-ingress
+        spec:
+            address: kafka-broker:9092
+            consumerGroupId: routable-kafka-e2e
+            startupPosition:
+                type: earliest
+            topics:
+              - topic: messages-1
+                typeUrl: org.apache.flink.statefun.docs.models.User
+                targets:
+                  - example-namespace/my-function-1
+                  - example-namespace/my-function-2
+{% endhighlight %}
+</div>
+</div>
+
+The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``.
+Please refer to the Kafka [consumer configuration](https://docs.confluent.io/current/installation/configuration/consumer-configs.html) documentation for the full list of available properties.
+Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties.
+
+### Startup Position
+
+The ingress allows configuring the startup position to be one of the following:
+
+#### From Group Offset (default)
+
+Starts from offsets that were committed to Kafka for the specified consumer group.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromGroupOffsets();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: group-offsets
+{% endhighlight %}
+</div>
+</div>
+
+#### Earlist
+
+Starts from the earliest offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromEarliest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: earliest
+{% endhighlight %}
+</div>
+</div>
+
+#### Latest
+
+Starts from the latest offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromLatest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: latest
+{% endhighlight %}
+</div>
+</div>
+
+#### Specific Offsets
+
+Starts from specific offsets, defined as a map of partitions to their target starting offset.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<TopicPartition, Long> offsets = new HashMap<>();
+offsets.add(new TopicPartition("user-topic", 0), 91);
+offsets.add(new TopicPartition("user-topic", 11), 11);
+offsets.add(new TopicPartition("user-topic", 8), 8);
+
+KafkaIngressStartupPosition#fromSpecificOffsets(offsets);
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: specific-offsets
+    offsets:
+        - user-topic/0: 91
+        - user-topic/1: 11
+        - user-topic/2: 8
+{% endhighlight %}
+</div>
+</div>
+
+#### Date
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: date
+    date: 2020-02-01 04:15:00.00 Z
+{% endhighlight %}
+</div>
+</div>
+
+On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``.
+By default, this is set to be the latest position.
+
+### Kafka Deserializer
+
+When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
+The ``KafkaIngressDeserializer`` allows users to specify such a schema.
+The ``T deserialize(ConsumerRecord<byte[], byte[]> record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserDeserializer implements KafkaIngressDeserializer<User> {
+
+	private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
+
+	private final ObjectMapper mapper = new ObjectMapper();
+
+	@Override
+	public User deserialize(ConsumerRecord<byte[], byte[]> input) {
+		try {
+			return mapper.readValue(input.value(), User.class);
+		} catch (IOException e) {
+			LOG.debug("Failed to deserialize record", e);
+			return null;
+		}
+	}
+}
+{% endhighlight %}
+
+## Kafka Egress Spec
+
+A ``KafkaEgressBuilder`` declares an egress spec for writing data out to a Kafka cluster.
+
+It accepts the following arguments:
+
+1. The egress identifier associated with this egress
+2. The address of the bootstrap servers
+3. A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only)
+4. The fault tolerance semantic
+5. Properties for the Kafka producer
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+
+public class EgressSpecs {
+
+  public static final EgressIdentifier<User> ID =
+      new EgressIdentifier<>("example", "output-egress", User.class);
+
+  public static final EgressSpec<User> kafkaEgress =
+      KafkaEgressBuilder.forIdentifier(ID)
+          .withKafkaAddress("localhost:9092")
+          .withSerializer(UserSerializer.class)
+          .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+    meta:
+    type: remote
+spec:
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: example/output-messages
+          spec:
+            address: kafka-broker:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 100000
+            properties:
+              - foo.config: bar
+{% endhighlight %}
+</div>
+</div>
+
+Please refer to the Kafka [producer configuration](https://docs.confluent.io/current/installation/configuration/producer-configs.html) documentation for the full list of available properties.
+
+### Kafka Egress and Fault Tolerance
+
+With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees.
+You can choose three different modes of operation.
+
+#### None
+
+Nothing is guaranteed, produced records can be lost or duplicated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withNoProducerSemantics();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+    type: none
+{% endhighlight %}
+</div>
+</div>
+
+#### At Least Once
+
+Stateful Functions will guarantee that no records will be lost but they can be duplicated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withAtLeastOnceProducerSemantics();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+    type: at-least-once
+{% endhighlight %}
+</div>
+</div>
+
+#### Exactly Once
+
+Stateful Functions uses Kafka transactions to provide exactly-once semantics.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+deliverySemantic:
+    type: exactly-once
+    transactionTimeoutMillis: 900000 # 15 min
+{% endhighlight %}
+</div>
+</div>
+
+### Kafka Serializer
+
+When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data.
+The ``KafkaEgressSerializer`` allows users to specify such a schema.
+The ``ProducerRecord<byte[], byte[]> serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserSerializer implements KafkaEgressSerializer<User> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
+
+  private static final String TOPIC = "user-topic";
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public ProducerRecord<byte[], byte[]> serialize(User user) {
+    try {
+      byte[] key = user.getUserId().getBytes();
+      byte[] value = mapper.writeValueAsBytes(user);
+
+      return new ProducerRecord<>(TOPIC, key, value);
+    } catch (JsonProcessingException e) {
+      LOG.info("Failed to serializer user", e);
+      return null;
+    }
+  }
+}
+{% endhighlight %}
\ No newline at end of file
diff --git a/docs/io-module/aws-kinesis.md b/docs/io-module/aws-kinesis.md
new file mode 100644
index 0000000..65e7d51
--- /dev/null
+++ b/docs/io-module/aws-kinesis.md
@@ -0,0 +1,465 @@
+---
+title: AWS Kinesis
+nav-id: aws-kinesis
+nav-pos: 2
+nav-title: AWS Kinesis
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams.
+It is based on Apache Flink's [Kinesis connector](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kinesis.html).
+The Kinesis I/O Module is configurable in Yaml or Java.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kinesis I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>statefun-kinesis-io</artifactId>
+    <version>{{ site.version }}</version>
+    <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Kinesis Ingress Spec
+
+A ``KinesisIngressSpec`` declares an ingress spec for consuming from Kinesis stream.
+
+It accepts the following arguments:
+
+1. The AWS region
+2. An AWS credentials provider
+3. A ``KinesisIngressDeserializer`` for deserializing data from Kinesis (Java only)
+4. The stream start position
+5. Properties for the Kinesis client
+6. The name of the stream to consume from
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+public class IngressSpecs {
+
+  public static final IngressIdentifier<User> ID =
+      new IngressIdentifier<>(User.class, "example", "input-ingress");
+
+  public static final IngressSpec<User> kinesisIngress =
+      KinesisIngressBuilder.forIdentifier(ID)
+          .withAwsRegion("us-west-1")
+          .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+          .withDeserializer(UserDeserializer.class)
+          .withStream("stream-name")
+          .withStartupPosition(KinesisIngressStartupPosition.fromEarliest())
+          .withClientConfigurationProperty("key", "value")
+          .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+    meta:
+        type: remote
+    spec:
+        ingresses:
+          - ingress:
+              meta:
+                type: statefun.kinesis.io/routable-protobuf-ingress
+                id: example-namespace/messages
+              spec:
+                awsRegion:
+                  type: specific
+                  id: us-west-1
+                awsCredentials:
+                  type: basic
+                  accessKeyId: my_access_key_id
+                  secretAccessKey: my_secret_access_key
+                startupPosition:
+                  type: earliest
+                streams:
+                  - stream: stream-1
+                    typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User
+                    targets:
+                      - example-namespace/my-function-1
+                      - example-namespace/my-function-2
+                  - stream: stream-2
+                    typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User
+                    targets:
+                      - example-namespace/my-function-1
+                clientConfigProperties:
+                  - SocketTimeout: 9999
+                  - MaxConnections: 15
+{% endhighlight %}
+</div>
+</div>
+
+The ingress also accepts properties to directly configure the Kinesis client, using ``KinesisIngressBuilder#withClientConfigurationProperty()``.
+Please refer to the Kinesis [client configuration](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) documentation for the full list of available properties.
+Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties.
+
+### Startup Position
+
+The ingress allows configuring the startup position to be one of the following:
+
+#### Latest (default)
+
+Start consuming from the latest position, i.e. head of the stream shards.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromLatest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: latest
+{% endhighlight %}
+</div>
+</div>
+
+#### Earlist
+
+Start consuming from the earliest position possible.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromEarliest();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: earliest
+{% endhighlight %}
+</div>
+</div>
+
+#### Date
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+KinesisIngressStartupPosition#fromDate(ZonedDateTime.now());
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+startupPosition:
+    type: date
+    date: 2020-02-01 04:15:00.00 Z
+{% endhighlight %}
+</div>
+</div>
+
+### Kinesis Deserializer
+
+The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects.
+The ``KinesisIngressDeserializer`` allows users to specify such a schema.
+The ``T deserialize(IngressRecord ingressRecord)`` method gets called for each Kinesis record, passing the binary data and metadata from Kinesis.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserDeserializer implements KinesisIngressDeserializer<User> {
+
+  private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public User deserialize(IngressRecord ingressRecord) {
+    try {
+      return mapper.readValue(ingressRecord.getData(), User.class);
+    } catch (IOException e) {
+      LOG.debug("Failed to deserialize record", e);
+      return null;
+    }
+  }
+}
+{% endhighlight %}
+
+## Kinesis Egress Spec
+
+A ``KinesisEgressBuilder`` declares an egress spec for writing data out to a Kinesis stream.
+
+It accepts the following arguments:
+
+1. The egress identifier associated with this egress
+2. The AWS credentials provider
+3. A ``KinesisEgressSerializer`` for serializing data into Kinesis (Java only)
+4. The AWS region
+5. Properties for the Kinesis client
+6. The number of max outstanding records before backpressure is applied
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+
+public class EgressSpecs {
+
+  public static final EgressIdentifier<User> ID =
+      new EgressIdentifier<>("example", "output-egress", User.class);
+
+  public static final EgressSpec<User> kinesisEgress =
+      KinesisEgressBuilder.forIdentifier(ID)
+          .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+          .withAwsRegion("us-west-1")
+          .withMaxOutstandingRecords(100)
+          .withClientConfigurationProperty("key", "value")
+          .withSerializer(UserSerializer.class)
+          .build();
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+    meta:
+        type: remote
+    spec:
+        egresses:
+          - egress:
+              meta:
+                type: statefun.kinesis.io/generic-egress
+                id: example/output-messages
+              spec:
+                awsRegion:
+                  type: custom-endpoint
+                  endpoint: https://localhost:4567
+                  id: us-west-1
+                awsCredentials:
+                  type: profile
+                  profileName: john-doe
+                  profilePath: /path/to/profile/config
+                maxOutstandingRecords: 9999
+                clientConfigProperties:
+                  - ThreadingModel: POOLED
+                  - ThreadPoolSize: 10
+{% endhighlight %}
+</div>
+</div>
+
+Please refer to the Kinesis [producer default configuration properties](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties) documentation for the full list of available properties.
+
+### Kinesis Serializer
+
+The Kinesis egress needs to know how to turn Java objects into binary data.
+The ``KinesisEgressSerializer`` allows users to specify such a schema.
+The ``EgressRecord serialize(T value)`` method gets called for each message, allowing users to set a value, and other metadata.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserSerializer implements KinesisEgressSerializer<User> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
+
+  private static final String STREAM = "user-stream";
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public EgressRecord serialize(User value) {
+    try {
+      return EgressRecord.newBuilder()
+          .withPartitionKey(value.getUserId())
+          .withData(mapper.writeValueAsBytes(value))
+          .withStream(STREAM)
+          .build();
+    } catch (IOException e) {
+      LOG.info("Failed to serializer user", e);
+      return null;
+    }
+  }
+}
+{% endhighlight %}
+
+## AWS Region
+
+Both the Kinesis ingress and egress can be configured to a specific AWS region.
+
+#### Default Provider Chain (default)
+
+Consults AWS's default provider chain to determine the AWS region.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.fromDefaultProviderChain();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: default
+{% endhighlight %}
+</div>
+</div>
+
+#### Specific
+
+Specifies an AWS region using the region's unique id.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.of("us-west-1");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: specific
+    id: us-west-1
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Custom Endpoint
+
+Connects to an AWS region through a non-standard AWS service endpoint.
+This is typically used only for development and testing purposes.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsRegion.ofCustomEndpoint("https://localhost:4567", "us-west-1");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: custom-endpoint
+    endpoint: https://localhost:4567
+    id: us-west-1
+{% endhighlight %}
+</div>
+</div>
+
+## AWS Credentials
+
+Both the Kinesis ingress and egress can be configured using standard AWS credential providers.
+
+#### Default Provider Chain (default)
+
+Consults AWS’s default provider chain to determine the AWS credentials.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.fromDefaultProviderChain();
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: default
+{% endhighlight %}
+</div>
+</div>
+
+#### Basic
+
+Specifies the AWS credentials directly with provided access key ID and secret access key strings.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.basic("accessKeyId", "secretAccessKey");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: basic
+    accessKeyId: access-key-id
+    secretAccessKey: secret-access-key
+{% endhighlight %}
+</div>
+</div>
+
+#### Profile
+
+Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AwsCredentials.profile("profile-name", "/path/to/profile/config");
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+awsCredentials:
+    type: basic
+    profileName: profile-name
+    profilePath: /path/to/profile/config
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/docs/io-module/flink-connectors.md b/docs/io-module/flink-connectors.md
new file mode 100644
index 0000000..d25e539
--- /dev/null
+++ b/docs/io-module/flink-connectors.md
@@ -0,0 +1,95 @@
+---
+title: Flink Connectors
+nav-id: flink-connectors
+nav-pos: 3
+nav-title: Flink Connectors
+nav-parent_id: io-module
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+The source-sink I/O module allows you to plug in existing, or custom, Flink connectors that are not already integrated into a dedicated I/O module.
+For details details of how to build a custom connector see the official [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-stable).
+
+* This will be replaced by the TOC
+{:toc}
+
+## Dependency
+
+To use the Kafka I/O Module in Java, please include the following dependency in your pom.
+
+{% highlight xml %}
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>statefun-flink-io</artifactId>
+    <version>{{ site.version }}</version>
+    <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+## Source Spec
+
+A source function spec creates an ingress from a Flink source function.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.flink;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithSourceSpec implements StatefulFunctionModule {
+
+    @Override
+    public void configure(Map<String, String> globalConfiguration, Binder binder) {
+        IngressIdentifier<User> id = new IngressIdentifier<>(User.class, "example", "users");
+        IngressSpec<User> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
+        binder.bindIngress(spec);
+    }
+}
+{% endhighlight %}
+
+## Sink Spec
+
+A sink function spec creates an egress from a Flink sink function.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.flink;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithSinkSpec implements StatefulFunctionModule {
+
+    @Override
+    public void configure(Map<String, String> globalConfiguration, Binder binder) {
+        EgressIdentifier<User> id = new EgressIdentifier<>("example", "user", User.class);
+        EgressSpec<User> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
+        binder.bindEgress(spec);
+    }
+}
+{% endhighlight %}
\ No newline at end of file
diff --git a/docs/io-module/index.md b/docs/io-module/index.md
new file mode 100644
index 0000000..3ae0905
--- /dev/null
+++ b/docs/io-module/index.md
@@ -0,0 +1,243 @@
+---
+title: I/O Module 
+nav-id: io-module
+nav-pos: 4
+nav-title: 'I/O Module'
+nav-parent_id: root
+nav-show_overview: true 
+permalink: /io-module/index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Stateful Functions' I/O modules allow functions to receive and send messages to external systems.
+Based on the concept of Ingress (input) and Egress (output) points, and built on top of the {flink} connector ecosystem, I/O modules enable functions to interact with the outside world through the style of message passing.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Ingress
+
+An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions.
+It is defined via an ``IngressIdentifier`` and an ``IngressSpec``.
+
+An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name.
+
+The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+public final class Identifiers {
+
+    public static final IngressIdentifier<User> INGRESS =
+        new IngressIdentifier<>(User.class, "example", "user-ingress");
+}
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.io.MissingImplementationException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithIngress implements StatefulFunctionModule {
+
+    @Override
+    public void configure(Map<String, String> globalConfiguration, Binder binder) {
+        IngressSpec<User> spec = createIngress(Identifiers.INGRESS);
+        binder.bindIngress(spec);
+    }
+
+    private IngressSpec<User> createIngress(IngressIdentifier<User> identifier) {
+        throw new MissingImplementationException("Replace with your specific ingress");
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+     meta:
+         type: remote
+     spec:
+         ingresses:
+           - ingress:
+               meta:
+                 id: example/user-ingress
+                 type: # ingress type
+               spec: # ingress specific configurations
+{% endhighlight %}
+</div>
+</div>
+
+## Router
+
+A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.
+Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import org.apache.flink.statefun.docs.FnUser;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.Router;
+
+public class UserRouter implements Router<User> {
+
+    @Override
+    public void route(User message, Downstream<User> downstream) {
+        downstream.forward(FnUser.TYPE, message.getUserId(), message);
+    }
+}
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.ingress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithRouter implements StatefulFunctionModule {
+
+    @Override
+    public void configure(Map<String, String> globalConfiguration, Binder binder) {
+        IngressSpec<User> spec = createIngressSpec(Identifiers.INGRESS);
+        Router<User> router = new UserRouter();
+
+        binder.bindIngress(spec);
+        binder.bindIngressRouter(Identifiers.INGRESS, router);
+    }
+
+    private IngressSpec<User> createIngressSpec(IngressIdentifier<User> identifier) {
+        throw new MissingImplementationException("Replace with your specific ingress");
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+When defined in ``yaml``, routers are defined by a list of function types.
+The ``id`` component of the address is pulled from the key associated with each record in its underlying source implementation.
+{% highlight yaml %}
+targets:
+    - example-namespace/my-function-1
+    - example-namespace/my-function-2
+{% endhighlight %}
+</div>
+</div>
+
+## Egress
+
+Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems.
+Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``.
+
+An egress identifier uniquely identifies an egress based on a namespace, name, and producing type.
+An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module.
+Each identifier-spec pair are bound to the system inside a stateful function module.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+public final class Identifiers {
+
+    public static final EgressIdentifier<User> EGRESS =
+        new EgressIdentifier<>("example", "egress", User.class);
+}
+
+{% endhighlight %}
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import java.util.Map;
+import org.apache.flink.statefun.docs.io.MissingImplementationException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class ModuleWithEgress implements StatefulFunctionModule {
+
+    @Override
+    public void configure(Map<String, String> globalConfiguration, Binder binder) {
+        EgressSpec<User> spec = createEgress(Identifiers.EGRESS);
+        binder.bindEgress(spec);
+    }
+
+    public EgressSpec<User> createEgress(EgressIdentifier<User> identifier) {
+        throw new MissingImplementationException("Replace with your specific egress");
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="yaml" markdown="1">
+{% highlight yaml %}
+version: "1.0"
+
+module:
+    meta:
+        type: remote
+    spec:
+        egresses:
+          - egress:
+              meta:
+                id: example/user-egress
+                type: # egress type
+              spec: # egress specific configurations
+{% endhighlight %}
+</div>
+</div>
+
+Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type.
+
+{% highlight java %}
+package org.apache.flink.statefun.docs.io.egress;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+/** A simple function that outputs messages to an egress. */
+public class FnOutputting implements StatefulFunction {
+
+    @Override
+    public void invoke(Context context, Object input) {
+        context.send(Identifiers.EGRESS, new User());
+    }
+}
+{% endhighlight %}
\ No newline at end of file