You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/22 07:52:28 UTC

[flink-statefun] branch master updated (4ac1dba -> 286a5af)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 4ac1dba  [FLINK-16124] [kinesis] Bind KinesisSinkProvider to Java Kinesis Egress Type
     add a485905  [hotfix] [kafka] Fix raw types in KafkaIngressBuilder
     add e5af42d  [FLINK-16124] [kinesis] Support building with KinesisIngressDeserializer instances at runtime
     add 3cbfe58  [hotfix] [core] Selectors.propertiesAt should always parse property values as strings
     add 7690bde  [FLINK-16124] [core] Parse spec JSON in JsonIngressSpec
     new 41c900e  [FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider
     new c85171c  [FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses
     new e07f9d3  [FLINK-16124] [kinesis] Bind Routable Protobuf Kafka ingress type in KinesisFlinkIOModule
     new db049d3  [FLINK-16124] [kinesis] Introduce KinesisEgressRecord protobuf message
     new 87cf108  [FLINK-16124] [core] Parse spec JSON within JsonEgressSpec
     new d0c74a7  [FLINK-16124] [kinesis] Implement runtime GenericKinesisSinkProvider
     new 01866d5  [FLINK-16124] [kinesis] Bind Generic Kafka egress type in KinesisFlinkIOModule
     new dcfa75d  [hotfix][docs] Add tabs and fix broken links
     new 7fcdae2  [FLINK-16557][docs] Document basic yaml i/o modules
     new b3a9ea1  [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in Stateful Functions documentation
     new 286a5af  [FLINK-16700][docs] Document Kinesis I/O Module

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 statefun-docs/docs/_static/css/customize-theme.css |  15 +-
 statefun-docs/docs/_templates/layout.html          |   1 +
 statefun-docs/docs/_templates/searchbox.html       |   0
 statefun-docs/docs/conf.py                         |  20 +-
 statefun-docs/docs/index_grid.html                 |   5 +-
 statefun-docs/docs/io_module/index.rst             | 116 +++++--
 statefun-docs/docs/io_module/kafka.rst             | 278 +++++++++++++--
 statefun-docs/docs/io_module/kinesis.rst           | 374 +++++++++++++++++++++
 statefun-docs/pom.xml                              |   5 +
 statefun-docs/requirements.txt                     |  29 +-
 statefun-docs/runtime.txt                          |   2 +-
 .../docs/io/{kafka => kinesis}/EgressSpecs.java    |  14 +-
 .../docs/io/{kafka => kinesis}/IngressSpecs.java   |  20 +-
 .../io/{kafka => kinesis}/UserDeserializer.java    |  12 +-
 .../docs/io/{kafka => kinesis}/UserSerializer.java |  25 +-
 .../statefun/flink/common/json/Selectors.java      |   3 -
 .../statefun/flink/core/jsonmodule/JsonModule.java |   8 +-
 .../protorouter/AutoRoutableProtobufRouter.java    |   4 +-
 .../flink/io/kinesis/KinesisFlinkIOModule.java     |   7 +
 .../flink/io/kinesis/KinesisSinkProvider.java      |   2 +-
 .../flink/io/kinesis/KinesisSourceProvider.java    |   8 +-
 .../io/kinesis/polyglot/AwsAuthSpecJsonParser.java | 130 +++++++
 .../polyglot/GenericKinesisEgressSerializer.java   |  62 ++++
 .../polyglot/GenericKinesisSinkProvider.java       |  92 +++++
 .../polyglot/KinesisEgressSpecJsonParser.java}     |  31 +-
 .../polyglot/KinesisIngressSpecJsonParser.java     | 146 ++++++++
 ...outableProtobufKinesisIngressDeserializer.java} |  30 +-
 .../RoutableProtobufKinesisSourceProvider.java     |  98 ++++++
 .../KinesisIngressBuilderApiExtension.java}        |  11 +-
 .../GenericKinesisSinkProviderTest.java}           |  17 +-
 ...RoutableProtobufKinesisSourceProviderTest.java} |  17 +-
 ...fka-egress.yaml => generic-kinesis-egress.yaml} |  20 +-
 ...yaml => routable-protobuf-kinesis-ingress.yaml} |  27 +-
 .../PolyglotKinesisIOTypes.java}                   |  17 +-
 .../statefun/flink/io/spi/JsonEgressSpec.java      |   8 +
 .../statefun/flink/io/spi/JsonIngressSpec.java     |   8 +
 .../{kafka-egress.proto => kinesis-egress.proto}   |   7 +-
 .../statefun/sdk/kafka/KafkaIngressBuilder.java    |   2 +-
 .../sdk/kinesis/ingress/KinesisIngressBuilder.java |  42 ++-
 .../sdk/kinesis/ingress/KinesisIngressSpec.java    |  10 +-
 .../sdk/kinesis/KinesisIngressBuilderTest.java     |   4 +-
 41 files changed, 1499 insertions(+), 228 deletions(-)
 delete mode 100644 statefun-docs/docs/_templates/searchbox.html
 create mode 100644 statefun-docs/docs/io_module/kinesis.rst
 copy statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/{kafka => kinesis}/EgressSpecs.java (70%)
 copy statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/{kafka => kinesis}/IngressSpecs.java (64%)
 copy statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/{kafka => kinesis}/UserDeserializer.java (76%)
 copy statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/{kafka => kinesis}/UserSerializer.java (66%)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java
 copy statefun-flink/{statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java => statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java} (53%)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
 copy statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/{kafka/RoutableProtobufKafkaIngressDeserializer.java => kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java} (61%)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
 copy statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/{kafka/KafkaIngressBuilderApiExtension.java => kinesis/ingress/KinesisIngressBuilderApiExtension.java} (72%)
 copy statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/{kafka/GenericKafkaSinkProviderTest.java => kinesis/GenericKinesisSinkProviderTest.java} (74%)
 copy statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/{kafka/RoutableProtobufKafkaSourceProviderTest.java => kinesis/RoutableProtobufKinesisSourceProviderTest.java} (71%)
 copy statefun-flink/statefun-flink-io-bundle/src/test/resources/{generic-kafka-egress.yaml => generic-kinesis-egress.yaml} (68%)
 copy statefun-flink/statefun-flink-io-bundle/src/test/resources/{routable-protobuf-kafka-ingress.yaml => routable-protobuf-kinesis-ingress.yaml} (75%)
 copy statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/{spi/FlinkIoModule.java => kinesis/PolyglotKinesisIOTypes.java} (69%)
 copy statefun-flink/statefun-flink-io/src/main/protobuf/{kafka-egress.proto => kinesis-egress.proto} (89%)


[flink-statefun] 05/11: [FLINK-16124] [core] Parse spec JSON within JsonEgressSpec

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 87cf1084dc6d3a84e1a5391c7b044c27e92d16b9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Mar 20 13:22:02 2020 +0800

    [FLINK-16124] [core] Parse spec JSON within JsonEgressSpec
---
 .../org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java    | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java
index 76ca193..5868840 100644
--- a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java
@@ -19,12 +19,16 @@
 package org.apache.flink.statefun.flink.io.spi;
 
 import java.util.Objects;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.statefun.sdk.io.EgressSpec;
 
 public final class JsonEgressSpec<T> implements EgressSpec<T> {
+
+  private static final JsonPointer SPEC_POINTER = JsonPointer.compile("/egress/spec");
+
   private final JsonNode json;
   private final EgressIdentifier<T> id;
   private final EgressType type;
@@ -48,4 +52,8 @@ public final class JsonEgressSpec<T> implements EgressSpec<T> {
   public JsonNode json() {
     return json;
   }
+
+  public JsonNode specJson() {
+    return json.requiredAt(SPEC_POINTER);
+  }
 }


[flink-statefun] 03/11: [FLINK-16124] [kinesis] Bind Routable Protobuf Kafka ingress type in KinesisFlinkIOModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e07f9d3b2203b3aa0bf34c24adcd4516c171cfb3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:46:08 2020 +0800

    [FLINK-16124] [kinesis] Bind Routable Protobuf Kafka ingress type in KinesisFlinkIOModule
---
 .../apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java  | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
index c3cbd0a..f404e32 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.io.kinesis;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider;
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
 import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
 
@@ -29,5 +30,8 @@ public final class KinesisFlinkIOModule implements FlinkIoModule {
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
     binder.bindSourceProvider(KinesisIOTypes.UNIVERSAL_INGRESS_TYPE, new KinesisSourceProvider());
     binder.bindSinkProvider(KinesisIOTypes.UNIVERSAL_EGRESS_TYPE, new KinesisSinkProvider());
+    binder.bindSourceProvider(
+        PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE,
+        new RoutableProtobufKinesisSourceProvider());
   }
 }


[flink-statefun] 11/11: [FLINK-16700][docs] Document Kinesis I/O Module

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 286a5af36a1e4958f669ab13d4ccbe08da071a21
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Fri Mar 20 10:38:23 2020 -0500

    [FLINK-16700][docs] Document Kinesis I/O Module
    
    This closes #65.
---
 statefun-docs/docs/io_module/index.rst             |   1 +
 statefun-docs/docs/io_module/kinesis.rst           | 374 +++++++++++++++++++++
 statefun-docs/pom.xml                              |   5 +
 .../statefun/docs/io/kinesis/EgressSpecs.java      |  39 +++
 .../statefun/docs/io/kinesis/IngressSpecs.java     |  41 +++
 .../statefun/docs/io/kinesis/UserDeserializer.java |  43 +++
 .../statefun/docs/io/kinesis/UserSerializer.java   |  49 +++
 7 files changed, 552 insertions(+)

diff --git a/statefun-docs/docs/io_module/index.rst b/statefun-docs/docs/io_module/index.rst
index ec2f299..90ae8b4 100644
--- a/statefun-docs/docs/io_module/index.rst
+++ b/statefun-docs/docs/io_module/index.rst
@@ -21,6 +21,7 @@ I/O Module
   :hidden:
 
   kafka
+  kinesis
   source_sink
   custom
 
diff --git a/statefun-docs/docs/io_module/kinesis.rst b/statefun-docs/docs/io_module/kinesis.rst
new file mode 100644
index 0000000..e912d10
--- /dev/null
+++ b/statefun-docs/docs/io_module/kinesis.rst
@@ -0,0 +1,374 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+###########
+AWS Kinesis
+###########
+
+Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams.
+It is based on Apache Flink's `Kinesis connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kinesis.html>`_.
+The Kinesis I/O Module is configurable in Yaml or Java.
+
+.. contents:: :local:
+
+Dependency
+==========
+
+To use the Kinesis I/O Module in Java, please include the following dependency in your pom.
+
+.. code-block:: xml
+
+    <dependency>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>statefun-kinesis-io</artifactId>
+        <version>{version}</version>
+        <scope>provided</scope>
+    </dependency>
+
+Kinesis Ingress Spec
+====================
+
+A ``KinesisIngressSpec`` declares an ingress spec for consuming from Kinesis stream.
+
+It accepts the following arguments:
+
+1) The AWS region
+2) An AWS credentials provider
+3) A ``KinesisIngressDeserializer`` for deserializing data from Kinesis (Java only)
+4) The stream start position
+5) Properties for the Kinesis client
+6) The name of the stream to consume from
+
+.. tabs::
+
+    .. group-tab:: Java 
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java
+            :language: java
+            :lines: 18-
+
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml 
+
+            version: "1.0"
+
+            module:
+                meta:
+                    type: remote
+                spec:
+                    ingresses:
+                      - ingress:
+                          meta:
+                            type: statefun.kinesis.io/routable-protobuf-ingress
+                            id: example-namespace/messages
+                          spec:
+                            awsRegion:
+                              type: specific
+                              id: us-west-1
+                            awsCredentials:
+                              type: basic
+                              accessKeyId: my_access_key_id
+                              secretAccessKey: my_secret_access_key
+                            startupPosition:
+                              type: earliest
+                            streams:
+                              - stream: stream-1
+                                typeUrl: com.googleapis/com.mycomp.foo.Message
+                                targets:
+                                  - example-namespace/my-function-1
+                                  - example-namespace/my-function-2
+                              - stream: stream-2
+                                typeUrl: com.googleapis/com.mycomp.foo.Message
+                                targets:
+                                  - example-namespace/my-function-1
+                            clientConfigProperties:
+                              - SocketTimeout: 9999
+                              - MaxConnections: 15
+
+The ingress also accepts properties to directly configure the Kinesis client, using ``KinesisIngressBuilder#withClientConfigurationProperty()``.
+Please refer to the Kinesis `client configuration <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html>`_ documentation for the full list of available properties.
+Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties.
+
+Startup Position
+^^^^^^^^^^^^^^^^
+
+The ingress allows configuring the startup position to be one of the following:
+
+**Latest (default)**
+
+Start consuming from the latest position, i.e. head of the stream shards.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KinesisIngressStartupPosition#fromLatest();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: latest
+
+**Earliest**
+
+Start consuming from the earliest position possible.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KinesisIngressStartupPosition#fromEarliest();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: earliest
+
+
+**Date**
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KinesisIngressStartupPosition#fromDate(ZonedDateTime.now())
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: date
+                date: 2020-02-01 04:15:00.00 Z
+
+Kinesis Deserializer
+^^^^^^^^^^^^^^^^^^^^
+
+The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects.
+The ``KinesisIngressDeserializer`` allows users to specify such a schema.
+The ``T deserialize(IngressRecord ingressRecord)`` method gets called for each Kinesis record, passing the binary data and metadata from Kinesis.
+
+.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java
+    :language: java
+    :lines: 18-
+
+Kinesis Egress Spec
+===================
+
+A ``KinesisEgressBuilder`` declares an egress spec for writing data out to a Kinesis stream.
+
+It accepts the following arguments:
+
+1) The egress identifier associated with this egress
+2) The AWS credentials provider
+3) A ``KinesisEgressSerializer`` for serializing data into Kinesis (Java only)
+4) The AWS region
+5) Properties for the Kinesis client
+6) The number of max outstanding records before backpressure is applied
+
+.. tabs::
+
+    .. group-tab:: Java 
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java
+            :language: java
+            :lines: 18-
+
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml 
+
+            version: "1.0"
+
+            module:
+                meta:
+                    type: remote
+                spec:
+                    egresses:
+                      - egress:
+                          meta:
+                            type: statefun.kinesis.io/generic-egress
+                            id: example/output-messages
+                          spec:
+                            awsRegion:
+                              type: custom-endpoint
+                              endpoint: https://localhost:4567
+                              id: us-west-1
+                            awsCredentials:
+                              type: profile
+                              profileName: john-doe
+                              profilePath: /path/to/profile/config
+                            maxOutstandingRecords: 9999
+                            clientConfigProperties:
+                              - ThreadingModel: POOLED
+                              - ThreadPoolSize: 10
+
+Please refer to the Kinesis `producer default configuration properties <https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties>`_ documentation for the full list of available properties.
+
+Kinesis Serializer
+^^^^^^^^^^^^^^^^^^
+
+The Kinesis egress needs to know how to turn Java objects into binary data.
+The ``KinesisEgressSerializer`` allows users to specify such a schema.
+The ``EgressRecord serialize(T value)`` method gets called for each message, allowing users to set a value, and other metadata.
+
+.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java
+    :language: java
+    :lines: 18-
+
+AWS Credentials
+===============
+
+Both the Kinesis ingress and egress can be configured using standard AWS credential providers.
+
+**Default Provider Chain (default)**
+
+Consults AWS’s default provider chain to determine the AWS credentials.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsCredentials.fromDefaultProviderChain();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsCredentials:
+              type: default
+
+**Basic**
+
+Specifies the AWS credentials directly with provided access key ID and secret access key strings.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsCredentials.basic("accessKeyId", "secretAccessKey");
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsCredentials:
+              type: basic
+              accessKeyId: acess-key-id
+              secretAccessKey: secret-access-key
+
+**Profile**
+
+Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsCredentials.profile("profile-name", "/path/to/profile/config");
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsCredentials:
+              type: profile
+              profileName: profile-name
+              profilePath: /path/to/profile/config
+
+AWS Region
+==========
+
+Both the Kinesis ingress and egress can be configured to a specific AWS region.
+
+**Default Provider Chain (default)**
+
+Consults AWS's default provider chain to determine the AWS region.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsRegion.fromDefaultProviderChain();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsRegion:
+              type: default
+
+**Specific**
+
+Specifies an AWS region using the region's unique id.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsRegion.of("us-west-1");
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsRegion:
+              type: specific
+              id: us-west-1
+
+**Custom Endpoint**
+
+Connects to an AWS region through a non-standard AWS service endpoint.
+This is typically used only for development and testing purposes.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            AwsRegion.ofCustomEndpoint("https://localhost:4567", "us-west-1");
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            awsRegion:
+              type: custom-endpoint
+              endpoint: https://localhost:4567
+              id: us-west-1
diff --git a/statefun-docs/pom.xml b/statefun-docs/pom.xml
index ea7ac1a..e9600cd 100644
--- a/statefun-docs/pom.xml
+++ b/statefun-docs/pom.xml
@@ -50,6 +50,11 @@ under the License.
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-kinesis-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
             <version>2.9.6</version>
diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java
new file mode 100644
index 0000000..773fbce
--- /dev/null
+++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+
+public class EgressSpecs {
+
+  public static final EgressIdentifier<User> ID =
+      new EgressIdentifier<>("example", "output-egress", User.class);
+
+  public static final EgressSpec<User> kinesisEgress =
+      KinesisEgressBuilder.forIdentifier(ID)
+          .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+          .withAwsRegion("us-west-1")
+          .withMaxOutstandingRecords(100)
+          .withClientConfigurationProperty("key", "value")
+          .withSerializer(UserSerializer.class)
+          .build();
+}
diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java
new file mode 100644
index 0000000..9a6d606
--- /dev/null
+++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+public class IngressSpecs {
+
+  public static final IngressIdentifier<User> ID =
+      new IngressIdentifier<>(User.class, "example", "input-ingress");
+
+  public static final IngressSpec<User> kinesisIngress =
+      KinesisIngressBuilder.forIdentifier(ID)
+          .withAwsRegion("us-west-1")
+          .withAwsCredentials(AwsCredentials.fromDefaultProviderChain())
+          .withDeserializer(UserDeserializer.class)
+          .withStream("stream-name")
+          .withStartupPosition(KinesisIngressStartupPosition.fromEarliest())
+          .withClientConfigurationProperty("key", "value")
+          .build();
+}
diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java
new file mode 100644
index 0000000..27669c6
--- /dev/null
+++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserDeserializer implements KinesisIngressDeserializer<User> {
+
+  private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public User deserialize(IngressRecord ingressRecord) {
+    try {
+      return mapper.readValue(ingressRecord.getData(), User.class);
+    } catch (IOException e) {
+      LOG.debug("Failed to deserialize record", e);
+      return null;
+    }
+  }
+}
diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java
new file mode 100644
index 0000000..f145ba6
--- /dev/null
+++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.docs.io.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.flink.statefun.docs.models.User;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UserSerializer implements KinesisEgressSerializer<User> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
+
+  private static final String STREAM = "user-stream";
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public EgressRecord serialize(User value) {
+    try {
+      return EgressRecord.newBuilder()
+          .withPartitionKey(value.getUserId())
+          .withData(mapper.writeValueAsBytes(value))
+          .withStream(STREAM)
+          .build();
+    } catch (IOException e) {
+      LOG.info("Failed to serializer user", e);
+      return null;
+    }
+  }
+}


[flink-statefun] 02/11: [FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c85171cb19831a717a09c8899149ab6d07ea5109
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:45:17 2020 +0800

    [FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses
---
 .../apache/flink/statefun/flink/core/jsonmodule/JsonModule.java   | 8 +++++++-
 .../flink/core/protorouter/AutoRoutableProtobufRouter.java        | 4 +---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 14a769c..0d167c1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -55,6 +55,7 @@ import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
 import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
 import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
 import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
 import org.apache.flink.statefun.sdk.EgressType;
@@ -139,7 +140,7 @@ final class JsonModule implements StatefulFunctionModule {
       JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
       binder.bindIngress(ingressSpec);
 
-      if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+      if (isAutoRoutableIngress(type)) {
         binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
       }
     }
@@ -170,6 +171,11 @@ final class JsonModule implements StatefulFunctionModule {
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
 
+  private static boolean isAutoRoutableIngress(IngressType ingressType) {
+    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+  }
+
   // ----------------------------------------------------------------------------------------------------------
   // Egresses
   // ----------------------------------------------------------------------------------------------------------
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
index 4e08369..eb37fe8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -24,7 +24,6 @@ import com.google.protobuf.Message;
 import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
 import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
 import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
-import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.Router;
 
@@ -32,8 +31,7 @@ import org.apache.flink.statefun.sdk.io.Router;
  * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
  *
  * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
- * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
- * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ * configured target addresses as a Protobuf {@link Any} message.
  */
 public final class AutoRoutableProtobufRouter implements Router<Message> {
 


[flink-statefun] 04/11: [FLINK-16124] [kinesis] Introduce KinesisEgressRecord protobuf message

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit db049d3bdb194490a9ca5738a0a6935d5fc60433
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Mar 20 14:29:12 2020 +0800

    [FLINK-16124] [kinesis] Introduce KinesisEgressRecord protobuf message
---
 .../src/main/protobuf/kinesis-egress.proto         | 30 ++++++++++++++++++++++
 1 file changed, 30 insertions(+)

diff --git a/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto b/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto
new file mode 100644
index 0000000..68c92c0
--- /dev/null
+++ b/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.flink.io;
+option java_package = "org.apache.flink.statefun.flink.io.generated";
+option java_multiple_files = true;
+
+message KinesisEgressRecord {
+    string partition_key = 1;
+    bytes value_bytes = 2;
+    string stream = 3;
+    string explicit_hash_key = 4;
+}


[flink-statefun] 06/11: [FLINK-16124] [kinesis] Implement runtime GenericKinesisSinkProvider

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit d0c74a754280aa31949b5aea96e50ac645aa9c52
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Mar 20 13:23:50 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime GenericKinesisSinkProvider
---
 .../flink/io/kinesis/KinesisSinkProvider.java      |  2 +-
 .../polyglot/GenericKinesisEgressSerializer.java   | 62 +++++++++++++++
 .../polyglot/GenericKinesisSinkProvider.java       | 92 ++++++++++++++++++++++
 .../polyglot/KinesisEgressSpecJsonParser.java      | 43 ++++++++++
 .../io/kinesis/GenericKinesisSinkProviderTest.java | 50 ++++++++++++
 .../src/test/resources/generic-kinesis-egress.yaml | 32 ++++++++
 .../flink/io/kinesis/PolyglotKinesisIOTypes.java   |  4 +
 7 files changed, 284 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
index c1156b6..c969a0d 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
@@ -26,7 +26,7 @@ import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
-final class KinesisSinkProvider implements SinkProvider {
+public final class KinesisSinkProvider implements SinkProvider {
 
   @Override
   public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
new file mode 100644
index 0000000..db3db58
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.flink.statefun.flink.io.generated.KinesisEgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+
+public final class GenericKinesisEgressSerializer implements KinesisEgressSerializer<Any> {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public EgressRecord serialize(Any value) {
+    final KinesisEgressRecord kinesisEgressRecord = asKinesisEgressRecord(value);
+
+    final EgressRecord.Builder builder =
+        EgressRecord.newBuilder()
+            .withData(kinesisEgressRecord.getValueBytes().toByteArray())
+            .withStream(kinesisEgressRecord.getStream())
+            .withPartitionKey(kinesisEgressRecord.getPartitionKey());
+
+    final String explicitHashKey = kinesisEgressRecord.getExplicitHashKey();
+    if (explicitHashKey != null && !explicitHashKey.isEmpty()) {
+      builder.withExplicitHashKey(explicitHashKey);
+    }
+
+    return builder.build();
+  }
+
+  private static KinesisEgressRecord asKinesisEgressRecord(Any message) {
+    if (!message.is(KinesisEgressRecord.class)) {
+      throw new IllegalStateException(
+          "The generic Kinesis egress expects only messages of type "
+              + KinesisEgressRecord.class.getName());
+    }
+    try {
+      return message.unpack(KinesisEgressRecord.class);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(
+          "Unable to unpack message as a " + KinesisEgressRecord.class.getName(), e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java
new file mode 100644
index 0000000..ad8fc1f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisEgressSpecJsonParser.clientConfigProperties;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisEgressSpecJsonParser.optionalMaxOutstandingRecords;
+
+import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.kinesis.KinesisSinkProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+public final class GenericKinesisSinkProvider implements SinkProvider {
+
+  private final KinesisSinkProvider delegateProvider = new KinesisSinkProvider();
+
+  @Override
+  public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+    final KinesisEgressSpec<T> kinesisEgressSpec = asKinesisEgressSpec(spec);
+    return delegateProvider.forSpec(kinesisEgressSpec);
+  }
+
+  private static <T> KinesisEgressSpec<T> asKinesisEgressSpec(EgressSpec<T> spec) {
+    if (!(spec instanceof JsonEgressSpec)) {
+      throw new IllegalArgumentException("Wrong type " + spec.type());
+    }
+    JsonEgressSpec<T> casted = (JsonEgressSpec<T>) spec;
+
+    EgressIdentifier<T> id = casted.id();
+    validateConsumedType(id);
+
+    JsonNode specJson = casted.specJson();
+
+    KinesisEgressBuilder<T> kinesisEgressBuilder = KinesisEgressBuilder.forIdentifier(id);
+
+    optionalAwsRegion(specJson).ifPresent(kinesisEgressBuilder::withAwsRegion);
+    optionalAwsCredentials(specJson).ifPresent(kinesisEgressBuilder::withAwsCredentials);
+    optionalMaxOutstandingRecords(specJson)
+        .ifPresent(kinesisEgressBuilder::withMaxOutstandingRecords);
+    clientConfigProperties(specJson)
+        .entrySet()
+        .forEach(
+            entry ->
+                kinesisEgressBuilder.withClientConfigurationProperty(
+                    entry.getKey(), entry.getValue()));
+
+    kinesisEgressBuilder.withSerializer(serializerClass());
+
+    return kinesisEgressBuilder.build();
+  }
+
+  private static void validateConsumedType(EgressIdentifier<?> id) {
+    Class<?> consumedType = id.consumedType();
+    if (Any.class != consumedType) {
+      throw new IllegalArgumentException(
+          "Generic Kinesis egress is only able to consume messages types of "
+              + Any.class.getName()
+              + " but "
+              + consumedType.getName()
+              + " is provided.");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> serializerClass() {
+    // this cast is safe, because we've already validated that the consumed type is Any.
+    return (Class<T>) GenericKinesisEgressSerializer.class;
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java
new file mode 100644
index 0000000..594623c
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.util.Map;
+import java.util.OptionalInt;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+
+final class KinesisEgressSpecJsonParser {
+
+  private KinesisEgressSpecJsonParser() {}
+
+  private static final JsonPointer MAX_OUTSTANDING_RECORDS_POINTER =
+      JsonPointer.compile("/maxOutstandingRecords");
+  private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER =
+      JsonPointer.compile("/clientConfigProperties");
+
+  static OptionalInt optionalMaxOutstandingRecords(JsonNode ingressSpecNode) {
+    return Selectors.optionalIntegerAt(ingressSpecNode, MAX_OUTSTANDING_RECORDS_POINTER);
+  }
+
+  static Map<String, String> clientConfigProperties(JsonNode ingressSpecNode) {
+    return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java
new file mode 100644
index 0000000..2a6b19b
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.GenericKinesisSinkProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.junit.Test;
+
+public class GenericKinesisSinkProviderTest {
+
+  @Test
+  public void exampleUsage() {
+    JsonNode egressDefinition =
+        loadAsJsonFromClassResource(getClass().getClassLoader(), "generic-kinesis-egress.yaml");
+    JsonEgressSpec<?> spec =
+        new JsonEgressSpec<>(
+            PolyglotKinesisIOTypes.GENERIC_KINESIS_EGRESS_TYPE,
+            new EgressIdentifier<>("foo", "bar", Any.class),
+            egressDefinition);
+
+    GenericKinesisSinkProvider provider = new GenericKinesisSinkProvider();
+    SinkFunction<?> source = provider.forSpec(spec);
+
+    assertThat(source, instanceOf(FlinkKinesisProducer.class));
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml
new file mode 100644
index 0000000..902822d
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+egress:
+  meta:
+    type: statefun.kinesis.io/generic-egress
+    id: com.mycomp.foo/bar
+  spec:
+    awsRegion:
+      type: custom-endpoint
+      endpoint: https://localhost:4567
+      id: us-west-1
+    awsCredentials:
+      type: profile
+      profileName: john-doe
+      profilePath: /path/to/profile/config
+    maxOutstandingRecords: 9999
+    clientConfigProperties:
+      - ThreadingModel: POOLED
+      - ThreadPoolSize: 10
diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
index c6b384e..252e59d 100644
--- a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.statefun.flink.io.kinesis;
 
+import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.IngressType;
 
 public final class PolyglotKinesisIOTypes {
@@ -26,4 +27,7 @@ public final class PolyglotKinesisIOTypes {
 
   public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE =
       new IngressType("statefun.kinesis.io", "routable-protobuf-ingress");
+
+  public static final EgressType GENERIC_KINESIS_EGRESS_TYPE =
+      new EgressType("statefun.kinesis.io", "generic-egress");
 }


[flink-statefun] 08/11: [hotfix][docs] Add tabs and fix broken links

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit dcfa75da6e70af00df7d1c0ffa36a6f7e41da738
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Sat Mar 21 15:03:01 2020 -0500

    [hotfix][docs] Add tabs and fix broken links
---
 statefun-docs/docs/_static/css/customize-theme.css | 15 ++++++++++-
 statefun-docs/docs/_templates/layout.html          |  1 +
 statefun-docs/docs/_templates/searchbox.html       |  0
 statefun-docs/docs/conf.py                         | 20 +++------------
 statefun-docs/docs/index_grid.html                 |  5 ++--
 statefun-docs/requirements.txt                     | 29 +++++++++++++++++++++-
 statefun-docs/runtime.txt                          |  2 +-
 7 files changed, 50 insertions(+), 22 deletions(-)

diff --git a/statefun-docs/docs/_static/css/customize-theme.css b/statefun-docs/docs/_static/css/customize-theme.css
index d5ca922..e295514 100644
--- a/statefun-docs/docs/_static/css/customize-theme.css
+++ b/statefun-docs/docs/_static/css/customize-theme.css
@@ -51,4 +51,17 @@
 
 .scv-banner > a {
   color: white ! important;
-}
\ No newline at end of file
+}
+
+.sphinx-tabs .sphinx-menu a.item {
+    color: #446D6F !important;
+}
+
+.sphinx-tabs .sphinx-menu a.active.item {
+    border-color: #446D6F !important;
+}
+
+.sphinx-tab {
+    border-color: #446D6F !important;
+}
+
diff --git a/statefun-docs/docs/_templates/layout.html b/statefun-docs/docs/_templates/layout.html
index 89acd5a..05c177b 100644
--- a/statefun-docs/docs/_templates/layout.html
+++ b/statefun-docs/docs/_templates/layout.html
@@ -3,6 +3,7 @@
 {%- set favicon = 'favicon.png' %}
 {%- set logo = 'logo.png' %}
 {%- set theme_logo_only = True %}
+{%- set extra_css_files = ['_static/css/customize-theme.css'] %}
 
 {% block sidebartitle %}
 
diff --git a/statefun-docs/docs/_templates/searchbox.html b/statefun-docs/docs/_templates/searchbox.html
deleted file mode 100644
index e69de29..0000000
diff --git a/statefun-docs/docs/conf.py b/statefun-docs/docs/conf.py
index d8555e9..7817989 100644
--- a/statefun-docs/docs/conf.py
+++ b/statefun-docs/docs/conf.py
@@ -1,3 +1,4 @@
+
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -32,7 +33,8 @@ import re
 # Add any Sphinx extension module names here, as strings. They can be
 # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
 # ones.
-extensions = ['sphinx.ext.ifconfig']
+# https://github.com/djungelorm/sphinx-tabs
+extensions = ['sphinx.ext.ifconfig', 'sphinx_tabs.tabs']
 
 # Add any paths that contain templates here, relative to this directory.
 templates_path = ['_templates']
@@ -106,8 +108,6 @@ todo_include_todos = False
 #
 html_theme = 'sphinx_rtd_theme'
 
-theme_prev_next_buttons_location = 'Top'
-
 # Theme options are theme-specific and customize the look and feel of a theme
 # further.  For a list of options available for each theme, see the
 # documentation.
@@ -139,22 +139,11 @@ html_sidebars = {
     ]
 }
 
-# Custom CSS.
-html_css_files = [
-    'css/customize-theme.css',
-]
-
-html_context = {
-    'css_files': ['_static/css/customize-theme.css']
-}
-
-
 # -- Options for HTMLHelp output ------------------------------------------
 
 # Output file base name for HTML help builder.
 htmlhelp_basename = 'StatefulFunctionsdoc'
 
-
 # -- Options for LaTeX output ---------------------------------------------
 
 latex_elements = {
@@ -180,7 +169,7 @@ latex_elements = {
 #  author, documentclass [howto, manual, or own class]).
 latex_documents = [
     (master_doc, 'StatefulFunctions.tex', u'Stateful Functions Documentation',
-     u'Ververica GmbH', 'manual'),
+     u'Apache Flink', 'manual'),
 ]
 
 
@@ -205,7 +194,6 @@ texinfo_documents = [
      'Miscellaneous'),
 ]
 
-
 # -- Settings for sphinxcontrib-versioning --------------------------------
 scv_greatest_tag = True
 scv_show_banner = True
diff --git a/statefun-docs/docs/index_grid.html b/statefun-docs/docs/index_grid.html
index 4f8b2bb..e190a3d 100644
--- a/statefun-docs/docs/index_grid.html
+++ b/statefun-docs/docs/index_grid.html
@@ -49,11 +49,10 @@ under the License.
 
    <div class="container">
      <div class="btn-toolbar">
-  		<button onclick="window.location.href = '/overview/index.html';">Learn More</button>
-  		<button onclick="window.location.href = '/api_concepts/index.html';">Read The Docs</button>
+  		<button onclick="window.location.href = '/concepts/index.html';">Read The Docs</button>
   		<button onclick="window.location.href = '/getting_started/index.html';">Get Started!</button>
 	 </div>
 
      </div>
 
-</html>
\ No newline at end of file
+</html>
diff --git a/statefun-docs/requirements.txt b/statefun-docs/requirements.txt
index 97882e2..3f5e986 100644
--- a/statefun-docs/requirements.txt
+++ b/statefun-docs/requirements.txt
@@ -1,5 +1,32 @@
+alabaster==0.7.12
+argh==0.26.2
+Babel==2.6.0
+certifi==2019.3.9
+chardet==3.0.4
+Click==7.0
+colorclass==2.2.0
+docutils==0.14
+idna==2.8
+imagesize==1.1.0
+Jinja2==2.10.1
+livereload==2.6.0
+MarkupSafe==1.1.1
+packaging==19.0
+pathtools==0.1.2
+port-for==0.3.1
+Pygments==2.3.1
+pyparsing==2.4.0
+pytz==2019.1
+PyYAML==5.1
+requests==2.21.0
 six==1.11.0
+snowballstemmer==1.2.1
 Sphinx==1.7.9
 sphinx-autobuild==0.7.1
-sphinx_rtd_theme==0.4.1
+sphinx-rtd-theme==0.4.1
+sphinx-tabs==1.1.13
 sphinxcontrib-versioning==2.2.1
+sphinxcontrib-websupport==1.1.0
+tornado==6.0.2
+urllib3==1.24.1
+watchdog==0.9.0
diff --git a/statefun-docs/runtime.txt b/statefun-docs/runtime.txt
index 475ba51..d70c8f8 100644
--- a/statefun-docs/runtime.txt
+++ b/statefun-docs/runtime.txt
@@ -1 +1 @@
-3.7
+3.6


[flink-statefun] 10/11: [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in Stateful Functions documentation

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b3a9ea1831ba1107fd9cdfc6d3eb405b6b9ef6e0
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Mar 17 10:05:31 2020 -0500

    [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in Stateful Functions documentation
    
    This closes #63.
---
 statefun-docs/docs/io_module/index.rst |   6 +-
 statefun-docs/docs/io_module/kafka.rst | 278 +++++++++++++++++++++++++++++----
 2 files changed, 250 insertions(+), 34 deletions(-)

diff --git a/statefun-docs/docs/io_module/index.rst b/statefun-docs/docs/io_module/index.rst
index 0b86808..ec2f299 100644
--- a/statefun-docs/docs/io_module/index.rst
+++ b/statefun-docs/docs/io_module/index.rst
@@ -32,7 +32,7 @@ Based on the concept of Ingress (input) and Egress (output) points, and built on
 .. _ingress:
 
 Ingress
-^^^^^^^^
+=======
 
 An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions.
 It is defined via an ``IngressIdentifier`` and an ``IngressSpec``.
@@ -71,7 +71,7 @@ The spec defines the details of how to connect to the external system, which is
                           spec: # ingress specific configurations
 
 Router
-""""""
+^^^^^^
 
 A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.
 Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.
@@ -101,7 +101,7 @@ Routers are bound to the system via a stateful function module, and unlike other
 .. _egress:
 
 Egress
-^^^^^^
+======
 
 Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems.
 Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``.
diff --git a/statefun-docs/docs/io_module/kafka.rst b/statefun-docs/docs/io_module/kafka.rst
index 1f4639d..3019004 100644
--- a/statefun-docs/docs/io_module/kafka.rst
+++ b/statefun-docs/docs/io_module/kafka.rst
@@ -19,13 +19,14 @@ Apache Kafka
 
 Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics.
 It is based on Apache Flink's universal `Kafka connector <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>`_ and provides exactly-once processing semantics.
+The Kafka I/O Module is configurable in Yaml or Java.
 
 .. contents:: :local:
 
 Dependency
-===========
+==========
 
-To use the Kafka I/O Module, please include the following dependency in your pom.
+To use the Kafka I/O Module in Java, please include the following dependency in your pom.
 
 .. code-block:: xml
 
@@ -36,10 +37,10 @@ To use the Kafka I/O Module, please include the following dependency in your pom
         <scope>provided</scope>
     </dependency>
 
-Kafka Ingress Builder
-=====================
+Kafka Ingress Spec
+==================
 
-A ``KafkaIngressBuilder`` declares an ingress spec for consuming from Kafka cluster. 
+A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster.
 
 It accepts the following arguments: 
 
@@ -47,33 +48,167 @@ It accepts the following arguments:
 2) The topic name / list of topic names
 3) The address of the bootstrap servers
 4) The consumer group id to use
-5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka
+5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only)
 6) The position to start consuming from
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java
-    :language: java
-    :lines: 18-
+.. tabs:: 
 
-The ingress allows configuring the startup position to be one of the following:
+    .. group-tab:: Java
 
-* ``KafkaIngressStartupPosition#fromGroupOffsets()`` (default): starts from offsets that were committed to Kafka for the specified consumer group.
-* ``KafkaIngressStartupPosition#fromEarliest()``: starts from the earliest offset.
-* ``KafkaIngressStartupPosition#fromLatest()``: starts from the latest offset.
-* ``KafkaIngressStartupPosition#fromSpecificOffsets(Map)``: starts from specific offsets, defined as a map of partitions to their target starting offset.
-* ``KafkaIngressStartupPosition#fromDate(Date)``: starts from offsets that have an ingestion time larger than or equal to a specified date.
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java
+            :language: java
+            :lines: 18-
 
-On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to
-start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured
-using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. By default, this is set to be the latest position.
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml
+
+            version: "1.0"
+
+            module:
+                meta:
+                type: remote
+            spec:
+                ingresses:
+                - ingress:
+                    meta:
+                        type: statefun.kafka.io/routable-protobuf-ingress
+                        id: example/user-ingress
+                    spec:
+                        address: kafka-broker:9092
+                        consumerGroupId: routable-kafka-e2e
+                        startupPosition:
+                            type: earliest
+                        topics:
+                          - topic: messages-1
+                            typeUrl: com.googleapis/com.company.MessageWithAddress
+                            targets:
+                              - example-namespace/my-function-1
+                              - example-namespace/my-function-2
 
 The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``.
 Please refer to the Kafka `consumer configuration <https://docs.confluent.io/current/installation/configuration/consumer-configs.html>`_ documentation for the full list of available properties.
 Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties.
 
+Startup Position
+^^^^^^^^^^^^^^^^
+
+The ingress allows configuring the startup position to be one of the following:
+
+**From Group Offset (default)**
+
+Starts from offsets that were committed to Kafka for the specified consumer group.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaIngressStartupPosition#fromGroupOffsets();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: group-offsets
+
+**Earliest** 
+
+Starts from the earliest offset.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaIngressStartupPosition#fromEarliest();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: earliest
+
+**Latest**
+
+Starts from the latest offset.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaIngressStartupPosition#fromLatest();
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: latest
+
+**Specific Offsets**
+
+Starts from specific offsets, defined as a map of partitions to their target starting offset.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            Map<TopicPartition, Long> offsets = new HashMap<>();
+            offsets.add(new TopicPartition("user-topic", 0), 91);
+            offsets.add(new TopicPartition("user-topic", 11), 11);
+            offsets.add(new TopicPartition("user-topic", 8), 8);
+
+            KafkaIngressStartupPosition#fromSpecificOffsets(offsets);
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: specific-offsets
+                offsets:
+                    - user-topic/0: 91
+                    - user-topic/1: 11
+                    - user-topic/2: 8
+
+**Date**
+
+Starts from offsets that have an ingestion time larger than or equal to a specified date.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());
+
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml 
+
+            startupPosition:
+                type: date
+                date: 2020-02-01 04:15:00.00 Z
+
+On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to
+start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured
+using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``.
+By default, this is set to be the latest position.
+
 Kafka Deserializer
-""""""""""""""""""
+^^^^^^^^^^^^^^^^^^
 
-The Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
+When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
 The ``KafkaIngressDeserializer`` allows users to specify such a schema.
 The ``T deserialize(ConsumerRecord<byte[], byte[]> record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka.
 
@@ -90,30 +225,111 @@ It accepts the following arguments:
 
 1) The egress identifier associated with this egress
 2) The address of the bootstrap servers
-3) A ``KafkaEgressSerializer`` for serializing data into Kafka
+3) A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only)
 4) The fault tolerance semantic
 5) Properties for the Kafka producer
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java
-    :language: java
-    :lines: 18-
+.. tabs::
+
+    .. group-tab:: Java 
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java
+            :language: java
+            :lines: 18-
+    
+    .. group-tab:: Yaml 
+
+        .. code-block:: yaml
+
+            version: "1.0"
+
+            module:
+                meta:
+                type: remote
+            spec:
+                egresses:
+                  - egress:
+                      meta:
+                        type: statefun.kafka.io/generic-egress
+                        id: example/output-messages
+                      spec:
+                        address: kafka-broker:9092
+                        deliverySemantic:
+                          type: exactly-once
+                          transactionTimeoutMillis: 100000
+                        properties:
+                          - foo.config: bar
 
 Please refer to the Kafka `producer configuration <https://docs.confluent.io/current/installation/configuration/producer-configs.html>`_ documentation for the full list of available properties.
 
 Kafka Egress and Fault Tolerance
-""""""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees.
-You can choose three different modes of operating based through the ``KafkaEgressBuilder``.
+You can choose three different modes of operation.
+
+**None**
+
+Nothing is guaranteed, produced records can be lost or duplicated.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaEgressBuilder#withNoProducerSemantics();
+
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml 
+
+            deliverySemantic:
+                type: none
+
+**At Least Once**
+
+Stateful Functions will guarantee that no records will be lost but they can be duplicated.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaEgressBuilder#withAtLeastOnceProducerSemantics();
+
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml 
+
+            deliverySemantic:
+                type: at-least-once
+
+**Exactly Once**
+
+Stateful Functions uses Kafka transactions to provide exactly-once semantics.
+
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. code-block:: none
+
+            KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));
+
+    .. group-tab:: Yaml
+
+        .. code-block:: yaml 
 
-* ``KafkaEgressBuilder#withNoProducerSemantics``: Nothing is guaranteed. Produced records can be lost or duplicated.
-* ``KafkaEgressBuilder#withAtLeastOnceProducerSemantics``: Stateful Functions will guarantee that nor records will be lost but they can be duplicated.
-* ``KafkaEgressBuilder#withExactlyOnceProducerSemantics``: Stateful Functions uses Kafka transactions to provide exactly-once semantics.
+            deliverySemantic:
+                type: exactly-once
+                transactionTimeoutMillis: 900000 # 15 min 
 
 Kafka Serializer
-""""""""""""""""
+^^^^^^^^^^^^^^^^
 
-The Kafka egress needs to know how to turn Java objects into binary data.
+When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data.
 The ``KafkaEgressSerializer`` allows users to specify such a schema.
 The ``ProducerRecord<byte[], byte[]> serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata.
 


[flink-statefun] 09/11: [FLINK-16557][docs] Document basic yaml i/o modules

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 7fcdae27aa1dfe853ea8414643907d2cdb78cc4e
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Sat Mar 21 15:55:18 2020 -0500

    [FLINK-16557][docs] Document basic yaml i/o modules
---
 statefun-docs/docs/io_module/index.rst | 109 +++++++++++++++++++++++----------
 1 file changed, 77 insertions(+), 32 deletions(-)

diff --git a/statefun-docs/docs/io_module/index.rst b/statefun-docs/docs/io_module/index.rst
index e9df63e..0b86808 100644
--- a/statefun-docs/docs/io_module/index.rst
+++ b/statefun-docs/docs/io_module/index.rst
@@ -35,37 +35,69 @@ Ingress
 ^^^^^^^^
 
 An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions.
-An ``IngressIdentifier`` and an ``IngressSpec`` define it.
+It is defined via an ``IngressIdentifier`` and an ``IngressSpec``.
 
 An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name.
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/Identifiers.java
-    :language: java
-    :lines: 18-
+The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module.
 
-The spec defines the details of how to connect to the external system, which is specific to each individual I/O module.
-Each identifier-spec pair is bound to the system inside an stateful function module.
+.. tabs:: 
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithIngress.java
-    :language: java
-    :lines: 18-
+    .. group-tab:: Java
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/Identifiers.java
+            :language: java
+            :lines: 18-
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithIngress.java
+            :language: java
+            :lines: 18-
+
+    .. group-tab:: Yaml
+
+        .. code-block:: Yaml
+
+           version: "1.0"
+
+           module:
+                meta:
+                    type: remote
+                spec:
+                    ingresses:
+                      - ingress:
+                          meta:
+                            id: example/user-ingress      
+                            type: # ingress type
+                          spec: # ingress specific configurations
 
 Router
 """"""
 
 A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.
+Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/UserRouter.java
-    :language: java
-    :lines: 18-
+.. tabs::
 
-Routers are bound to the system via a stateful function module.
-Unlike other components, an ingress may have any number of routers.
+    .. group-tab:: Java
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithRouter.java
-    :language: java
-    :lines: 18-
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/UserRouter.java
+            :language: java
+            :lines: 18-
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithRouter.java
+            :language: java
+            :lines: 18-
+
+    .. group-tab:: Yaml
+
+        When defined in ``yaml``, routers are defined by a list of function types.
+        The ``id`` component of the address is pulled from the key associated with each record in its underlying source implementation. 
 
+        .. code-block:: Yaml
+
+            targets:
+              - example-namespace/my-function-1
+              - example-namespace/my-function-2  
 .. _egress:
 
 Egress
@@ -75,27 +107,40 @@ Egress is the opposite of ingress; it is a point that takes messages and writes
 Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``.
 
 An egress identifier uniquely identifies an egress based on a namespace, name, and producing type.
-
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/Identifiers.java
-    :language: java
-    :lines: 18-
-
 An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module.
 Each identifier-spec pair are bound to the system inside a stateful function module.
 
-.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/ModuleWithEgress.java
-    :language: java
-    :lines: 18-
+.. tabs::
+
+    .. group-tab:: Java
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/Identifiers.java
+            :language: java
+            :lines: 18-
+
+        .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/ModuleWithEgress.java
+            :language: java
+            :lines: 18-
+
+    .. group-tab:: Yaml
+
+        .. code-block:: Yaml 
+    
+            version: "1.0"
+
+            module:
+                meta:
+                    type: remote
+                spec:
+                    egresses:
+                      - egress:
+                          meta:
+                            id: example/user-egress
+                            type: # egress type
+                          spec: # egress specific configurations
 
 Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type.
 
 .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/FnOutputting.java
     :language: java
     :lines: 18-
-
-I/O modules leverage `Java’s Service Provider Interfaces (SPI) <https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html>`_ for discovery.
-This means that every JAR should contain a file ``org.apache.flink.statefun.sdk.spi.StatefulFunctionModule`` in the ``META_INF/services`` resource directory that lists all available modules that it provides.
-
-.. code-block:: yaml
-
-    BasicFunctionModule


[flink-statefun] 01/11: [FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 41c900e16efa1bbe29990701ce0be03a6da08df8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:44:23 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider
---
 .../flink/io/kinesis/KinesisSourceProvider.java    |   2 +-
 .../io/kinesis/polyglot/AwsAuthSpecJsonParser.java | 130 ++++++++++++++++++
 .../polyglot/KinesisIngressSpecJsonParser.java     | 146 +++++++++++++++++++++
 ...RoutableProtobufKinesisIngressDeserializer.java |  59 +++++++++
 .../RoutableProtobufKinesisSourceProvider.java     |  98 ++++++++++++++
 .../ingress/KinesisIngressBuilderApiExtension.java |  28 ++++
 .../RoutableProtobufKinesisSourceProviderTest.java |  52 ++++++++
 .../routable-protobuf-kinesis-ingress.yaml         |  42 ++++++
 .../flink/io/kinesis/PolyglotKinesisIOTypes.java   |  29 ++++
 9 files changed, 585 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index 1a0ab09..93e322e 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
-final class KinesisSourceProvider implements SourceProvider {
+public final class KinesisSourceProvider implements SourceProvider {
 
   @Override
   public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
new file mode 100644
index 0000000..47c26e4
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+final class AwsAuthSpecJsonParser {
+
+  private AwsAuthSpecJsonParser() {}
+
+  private static final JsonPointer AWS_REGION_POINTER = JsonPointer.compile("/awsRegion");
+  private static final JsonPointer AWS_CREDENTIALS_POINTER = JsonPointer.compile("/awsCredentials");
+
+  private static final class Region {
+    private static final String DEFAULT_TYPE = "default";
+    private static final String SPECIFIED_ID_TYPE = "specific";
+    private static final String CUSTOM_ENDPOINT_TYPE = "custom-endpoint";
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer ID_POINTER = JsonPointer.compile("/id");
+    private static final JsonPointer ENDPOINT_POINTER = JsonPointer.compile("/endpoint");
+  }
+
+  private static final class Credentials {
+    private static final String DEFAULT_TYPE = "default";
+    private static final String BASIC_TYPE = "basic";
+    private static final String PROFILE_TYPE = "profile";
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer ACCESS_KEY_ID_POINTER = JsonPointer.compile("/accessKeyId");
+    private static final JsonPointer SECRET_ACCESS_KEY_POINTER =
+        JsonPointer.compile("/secretAccessKey");
+    private static final JsonPointer PROFILE_NAME_POINTER = JsonPointer.compile("/profileName");
+    private static final JsonPointer PROFILE_PATH_POINTER = JsonPointer.compile("/profilePath");
+  }
+
+  static Optional<AwsRegion> optionalAwsRegion(JsonNode specNode) {
+    final JsonNode awsRegionSpecNode = specNode.at(AWS_REGION_POINTER);
+    if (awsRegionSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(awsRegionSpecNode, Region.TYPE_POINTER);
+    switch (type) {
+      case Region.DEFAULT_TYPE:
+        return Optional.of(AwsRegion.fromDefaultProviderChain());
+      case Region.SPECIFIED_ID_TYPE:
+        return Optional.of(AwsRegion.ofId(Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+      case Region.CUSTOM_ENDPOINT_TYPE:
+        return Optional.of(
+            AwsRegion.ofCustomEndpoint(
+                Selectors.textAt(awsRegionSpecNode, Region.ENDPOINT_POINTER),
+                Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                Region.DEFAULT_TYPE, Region.SPECIFIED_ID_TYPE, Region.CUSTOM_ENDPOINT_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid AWS region type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+
+  static Optional<AwsCredentials> optionalAwsCredentials(JsonNode specNode) {
+    final JsonNode awsCredentialsSpecNode = specNode.at(AWS_CREDENTIALS_POINTER);
+    if (awsCredentialsSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(awsCredentialsSpecNode, Credentials.TYPE_POINTER);
+    switch (type) {
+      case Credentials.DEFAULT_TYPE:
+        return Optional.of(AwsCredentials.fromDefaultProviderChain());
+      case Credentials.BASIC_TYPE:
+        return Optional.of(
+            AwsCredentials.basic(
+                Selectors.textAt(awsCredentialsSpecNode, Credentials.ACCESS_KEY_ID_POINTER),
+                Selectors.textAt(awsCredentialsSpecNode, Credentials.SECRET_ACCESS_KEY_POINTER)));
+      case Credentials.PROFILE_TYPE:
+        final Optional<String> path =
+            Selectors.optionalTextAt(awsCredentialsSpecNode, Credentials.PROFILE_PATH_POINTER);
+        if (path.isPresent()) {
+          return Optional.of(
+              AwsCredentials.profile(
+                  Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER),
+                  path.get()));
+        } else {
+          return Optional.of(
+              AwsCredentials.profile(
+                  Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER)));
+        }
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                Credentials.DEFAULT_TYPE, Credentials.BASIC_TYPE, Credentials.PROFILE_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid AWS credential type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
new file mode 100644
index 0000000..823cdfb
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+final class KinesisIngressSpecJsonParser {
+
+  private KinesisIngressSpecJsonParser() {}
+
+  private static final JsonPointer STREAMS_POINTER = JsonPointer.compile("/streams");
+  private static final JsonPointer STARTUP_POSITION_POINTER =
+      JsonPointer.compile("/startupPosition");
+  private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER =
+      JsonPointer.compile("/clientConfigProperties");
+
+  private static final class Streams {
+    private static final JsonPointer NAME_POINTER = JsonPointer.compile("/stream");
+    private static final JsonPointer TYPE_URL_POINTER = JsonPointer.compile("/typeUrl");
+    private static final JsonPointer TARGETS_POINTER = JsonPointer.compile("/targets");
+  }
+
+  private static final class StartupPosition {
+    private static final String EARLIEST_TYPE = "earliest";
+    private static final String LATEST_TYPE = "latest";
+    private static final String DATE_TYPE = "date";
+
+    private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
+    private static final DateTimeFormatter DATE_FORMATTER =
+        DateTimeFormatter.ofPattern(DATE_PATTERN);
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer DATE_POINTER = JsonPointer.compile("/date");
+  }
+
+  static Optional<KinesisIngressStartupPosition> optionalStartupPosition(JsonNode ingressSpecNode) {
+    final JsonNode startupPositionSpecNode = ingressSpecNode.at(STARTUP_POSITION_POINTER);
+    if (startupPositionSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(startupPositionSpecNode, StartupPosition.TYPE_POINTER);
+    switch (type) {
+      case StartupPosition.EARLIEST_TYPE:
+        return Optional.of(KinesisIngressStartupPosition.fromEarliest());
+      case StartupPosition.LATEST_TYPE:
+        return Optional.of(KinesisIngressStartupPosition.fromLatest());
+      case StartupPosition.DATE_TYPE:
+        return Optional.of(
+            KinesisIngressStartupPosition.fromDate(startupDate(startupPositionSpecNode)));
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                StartupPosition.EARLIEST_TYPE,
+                StartupPosition.LATEST_TYPE,
+                StartupPosition.DATE_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid startup position type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+
+  static Map<String, String> clientConfigProperties(JsonNode ingressSpecNode) {
+    return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER);
+  }
+
+  static Map<String, RoutingConfig> routableStreams(JsonNode ingressSpecNode) {
+    Map<String, RoutingConfig> routableStreams = new HashMap<>();
+    for (JsonNode routableStreamNode : Selectors.listAt(ingressSpecNode, STREAMS_POINTER)) {
+      final String streamName = Selectors.textAt(routableStreamNode, Streams.NAME_POINTER);
+      final String typeUrl = Selectors.textAt(routableStreamNode, Streams.TYPE_URL_POINTER);
+      final List<TargetFunctionType> targets = parseRoutableTargetFunctionTypes(routableStreamNode);
+
+      routableStreams.put(
+          streamName,
+          RoutingConfig.newBuilder()
+              .setTypeUrl(typeUrl)
+              .addAllTargetFunctionTypes(targets)
+              .build());
+    }
+    return routableStreams;
+  }
+
+  private static List<TargetFunctionType> parseRoutableTargetFunctionTypes(
+      JsonNode routableStreamNode) {
+    final List<TargetFunctionType> targets = new ArrayList<>();
+    for (String namespaceAndName :
+        Selectors.textListAt(routableStreamNode, Streams.TARGETS_POINTER)) {
+      NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName);
+      targets.add(
+          TargetFunctionType.newBuilder()
+              .setNamespace(namespaceNamePair.namespace())
+              .setType(namespaceNamePair.name())
+              .build());
+    }
+    return targets;
+  }
+
+  private static ZonedDateTime startupDate(JsonNode startupPositionSpecNode) {
+    final String dateStr = Selectors.textAt(startupPositionSpecNode, StartupPosition.DATE_POINTER);
+    try {
+      return ZonedDateTime.parse(dateStr, StartupPosition.DATE_FORMATTER);
+    } catch (DateTimeParseException e) {
+      throw new IllegalArgumentException(
+          "Unable to parse date string for startup position: "
+              + dateStr
+              + "; the date should conform to the pattern "
+              + StartupPosition.DATE_PATTERN,
+          e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
new file mode 100644
index 0000000..b8f02d8
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+
+public final class RoutableProtobufKinesisIngressDeserializer
+    implements KinesisIngressDeserializer<Message> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Map<String, RoutingConfig> routingConfigs;
+
+  RoutableProtobufKinesisIngressDeserializer(Map<String, RoutingConfig> routingConfigs) {
+    if (routingConfigs == null || routingConfigs.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Routing config for routable Kinesis ingress cannot be empty.");
+    }
+    this.routingConfigs = routingConfigs;
+  }
+
+  @Override
+  public Message deserialize(IngressRecord ingressRecord) {
+    final String stream = ingressRecord.getStream();
+
+    final RoutingConfig routingConfig = routingConfigs.get(stream);
+    if (routingConfig == null) {
+      throw new IllegalStateException(
+          "Consumed a record from stream [" + stream + "], but no routing config was specified.");
+    }
+
+    return AutoRoutable.newBuilder()
+        .setConfig(routingConfig)
+        .setId(ingressRecord.getPartitionKey())
+        .setPayloadBytes(ByteString.copyFrom(ingressRecord.getData()))
+        .build();
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
new file mode 100644
index 0000000..99776a3
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.clientConfigProperties;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.optionalStartupPosition;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.routableStreams;
+
+import com.google.protobuf.Message;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.kinesis.KinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilderApiExtension;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public final class RoutableProtobufKinesisSourceProvider implements SourceProvider {
+
+  private final KinesisSourceProvider delegateProvider = new KinesisSourceProvider();
+
+  @Override
+  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+    final KinesisIngressSpec<T> kinesisIngressSpec = asKinesisIngressSpec(spec);
+    return delegateProvider.forSpec(kinesisIngressSpec);
+  }
+
+  private static <T> KinesisIngressSpec<T> asKinesisIngressSpec(IngressSpec<T> spec) {
+    if (!(spec instanceof JsonIngressSpec)) {
+      throw new IllegalArgumentException("Wrong type " + spec.type());
+    }
+    JsonIngressSpec<T> casted = (JsonIngressSpec<T>) spec;
+
+    IngressIdentifier<T> id = casted.id();
+    Class<T> producedType = casted.id().producedType();
+    if (!Message.class.isAssignableFrom(producedType)) {
+      throw new IllegalArgumentException(
+          "ProtocolBuffer based Kinesis ingress is only able to produce types that derive from "
+              + Message.class.getName()
+              + " but "
+              + producedType.getName()
+              + " is provided.");
+    }
+
+    JsonNode specJson = casted.specJson();
+
+    KinesisIngressBuilder<T> kinesisIngressBuilder = KinesisIngressBuilder.forIdentifier(id);
+
+    optionalAwsRegion(specJson).ifPresent(kinesisIngressBuilder::withAwsRegion);
+    optionalAwsCredentials(specJson).ifPresent(kinesisIngressBuilder::withAwsCredentials);
+    optionalStartupPosition(specJson).ifPresent(kinesisIngressBuilder::withStartupPosition);
+    clientConfigProperties(specJson)
+        .entrySet()
+        .forEach(
+            entry ->
+                kinesisIngressBuilder.withClientConfigurationProperty(
+                    entry.getKey(), entry.getValue()));
+
+    Map<String, RoutingConfig> routableStreams = routableStreams(specJson);
+    KinesisIngressBuilderApiExtension.withDeserializer(
+        kinesisIngressBuilder, deserializer(routableStreams));
+    kinesisIngressBuilder.withStreams(new ArrayList<>(routableStreams.keySet()));
+
+    return kinesisIngressBuilder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> KinesisIngressDeserializer<T> deserializer(
+      Map<String, RoutingConfig> routingConfig) {
+    // this cast is safe since we've already checked that T is a Message
+    return (KinesisIngressDeserializer<T>)
+        new RoutableProtobufKinesisIngressDeserializer(routingConfig);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
new file mode 100644
index 0000000..bfa7ef9
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+public final class KinesisIngressBuilderApiExtension {
+
+  private KinesisIngressBuilderApiExtension() {}
+
+  public static <T> void withDeserializer(
+      KinesisIngressBuilder<T> kinesisIngressBuilder, KinesisIngressDeserializer<T> deserializer) {
+    kinesisIngressBuilder.withDeserializer(deserializer);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
new file mode 100644
index 0000000..81e6ff1
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.junit.Test;
+
+public class RoutableProtobufKinesisSourceProviderTest {
+
+  @Test
+  public void exampleUsage() {
+    JsonNode ingressDefinition =
+        loadAsJsonFromClassResource(
+            getClass().getClassLoader(), "routable-protobuf-kinesis-ingress.yaml");
+    JsonIngressSpec<?> spec =
+        new JsonIngressSpec<>(
+            PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE,
+            new IngressIdentifier<>(Message.class, "foo", "bar"),
+            ingressDefinition);
+
+    RoutableProtobufKinesisSourceProvider provider = new RoutableProtobufKinesisSourceProvider();
+    SourceFunction<?> source = provider.forSpec(spec);
+
+    assertThat(source, instanceOf(FlinkKinesisConsumer.class));
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
new file mode 100644
index 0000000..95616ee
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ingress:
+  meta:
+    type: statefun.kinesis.io/routable-protobuf-ingress
+    id: com.mycomp.foo/bar
+  spec:
+    awsRegion:
+      type: specific
+      id: us-west-2
+    awsCredentials:
+      type: basic
+      accessKeyId: my_access_key_id
+      secretAccessKey: my_secret_access_key
+    startupPosition:
+      type: earliest
+    streams:
+      - stream: stream-1
+        typeUrl: com.googleapis/com.mycomp.foo.MessageA
+        targets:
+          - com.mycomp.foo/function-1
+          - com.mycomp.foo/function-2
+      - stream: topic-2
+        typeUrl: com.googleapis/com.mycomp.foo.MessageB
+        targets:
+          - com.mycomp.foo/function-2
+    clientConfigProperties:
+      - SocketTimeout: 9999
+      - MaxConnections: 15
diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
new file mode 100644
index 0000000..c6b384e
--- /dev/null
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class PolyglotKinesisIOTypes {
+
+  private PolyglotKinesisIOTypes() {}
+
+  public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE =
+      new IngressType("statefun.kinesis.io", "routable-protobuf-ingress");
+}


[flink-statefun] 07/11: [FLINK-16124] [kinesis] Bind Generic Kafka egress type in KinesisFlinkIOModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 01866d5becdaa75f5b05ab9471359d583fc4f2b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Mar 20 13:24:27 2020 +0800

    [FLINK-16124] [kinesis] Bind Generic Kafka egress type in KinesisFlinkIOModule
    
    This closes #64.
---
 .../apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java   | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
index f404e32..fa813fd 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.io.kinesis;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.GenericKinesisSinkProvider;
 import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider;
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
 import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
@@ -33,5 +34,7 @@ public final class KinesisFlinkIOModule implements FlinkIoModule {
     binder.bindSourceProvider(
         PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE,
         new RoutableProtobufKinesisSourceProvider());
+    binder.bindSinkProvider(
+        PolyglotKinesisIOTypes.GENERIC_KINESIS_EGRESS_TYPE, new GenericKinesisSinkProvider());
   }
 }