You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/22 07:52:29 UTC

[flink-statefun] 01/11: [FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 41c900e16efa1bbe29990701ce0be03a6da08df8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:44:23 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider
---
 .../flink/io/kinesis/KinesisSourceProvider.java    |   2 +-
 .../io/kinesis/polyglot/AwsAuthSpecJsonParser.java | 130 ++++++++++++++++++
 .../polyglot/KinesisIngressSpecJsonParser.java     | 146 +++++++++++++++++++++
 ...RoutableProtobufKinesisIngressDeserializer.java |  59 +++++++++
 .../RoutableProtobufKinesisSourceProvider.java     |  98 ++++++++++++++
 .../ingress/KinesisIngressBuilderApiExtension.java |  28 ++++
 .../RoutableProtobufKinesisSourceProviderTest.java |  52 ++++++++
 .../routable-protobuf-kinesis-ingress.yaml         |  42 ++++++
 .../flink/io/kinesis/PolyglotKinesisIOTypes.java   |  29 ++++
 9 files changed, 585 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index 1a0ab09..93e322e 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
-final class KinesisSourceProvider implements SourceProvider {
+public final class KinesisSourceProvider implements SourceProvider {
 
   @Override
   public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
new file mode 100644
index 0000000..47c26e4
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+final class AwsAuthSpecJsonParser {
+
+  private AwsAuthSpecJsonParser() {}
+
+  private static final JsonPointer AWS_REGION_POINTER = JsonPointer.compile("/awsRegion");
+  private static final JsonPointer AWS_CREDENTIALS_POINTER = JsonPointer.compile("/awsCredentials");
+
+  private static final class Region {
+    private static final String DEFAULT_TYPE = "default";
+    private static final String SPECIFIED_ID_TYPE = "specific";
+    private static final String CUSTOM_ENDPOINT_TYPE = "custom-endpoint";
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer ID_POINTER = JsonPointer.compile("/id");
+    private static final JsonPointer ENDPOINT_POINTER = JsonPointer.compile("/endpoint");
+  }
+
+  private static final class Credentials {
+    private static final String DEFAULT_TYPE = "default";
+    private static final String BASIC_TYPE = "basic";
+    private static final String PROFILE_TYPE = "profile";
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer ACCESS_KEY_ID_POINTER = JsonPointer.compile("/accessKeyId");
+    private static final JsonPointer SECRET_ACCESS_KEY_POINTER =
+        JsonPointer.compile("/secretAccessKey");
+    private static final JsonPointer PROFILE_NAME_POINTER = JsonPointer.compile("/profileName");
+    private static final JsonPointer PROFILE_PATH_POINTER = JsonPointer.compile("/profilePath");
+  }
+
+  static Optional<AwsRegion> optionalAwsRegion(JsonNode specNode) {
+    final JsonNode awsRegionSpecNode = specNode.at(AWS_REGION_POINTER);
+    if (awsRegionSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(awsRegionSpecNode, Region.TYPE_POINTER);
+    switch (type) {
+      case Region.DEFAULT_TYPE:
+        return Optional.of(AwsRegion.fromDefaultProviderChain());
+      case Region.SPECIFIED_ID_TYPE:
+        return Optional.of(AwsRegion.ofId(Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+      case Region.CUSTOM_ENDPOINT_TYPE:
+        return Optional.of(
+            AwsRegion.ofCustomEndpoint(
+                Selectors.textAt(awsRegionSpecNode, Region.ENDPOINT_POINTER),
+                Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                Region.DEFAULT_TYPE, Region.SPECIFIED_ID_TYPE, Region.CUSTOM_ENDPOINT_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid AWS region type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+
+  static Optional<AwsCredentials> optionalAwsCredentials(JsonNode specNode) {
+    final JsonNode awsCredentialsSpecNode = specNode.at(AWS_CREDENTIALS_POINTER);
+    if (awsCredentialsSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(awsCredentialsSpecNode, Credentials.TYPE_POINTER);
+    switch (type) {
+      case Credentials.DEFAULT_TYPE:
+        return Optional.of(AwsCredentials.fromDefaultProviderChain());
+      case Credentials.BASIC_TYPE:
+        return Optional.of(
+            AwsCredentials.basic(
+                Selectors.textAt(awsCredentialsSpecNode, Credentials.ACCESS_KEY_ID_POINTER),
+                Selectors.textAt(awsCredentialsSpecNode, Credentials.SECRET_ACCESS_KEY_POINTER)));
+      case Credentials.PROFILE_TYPE:
+        final Optional<String> path =
+            Selectors.optionalTextAt(awsCredentialsSpecNode, Credentials.PROFILE_PATH_POINTER);
+        if (path.isPresent()) {
+          return Optional.of(
+              AwsCredentials.profile(
+                  Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER),
+                  path.get()));
+        } else {
+          return Optional.of(
+              AwsCredentials.profile(
+                  Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER)));
+        }
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                Credentials.DEFAULT_TYPE, Credentials.BASIC_TYPE, Credentials.PROFILE_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid AWS credential type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
new file mode 100644
index 0000000..823cdfb
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+final class KinesisIngressSpecJsonParser {
+
+  private KinesisIngressSpecJsonParser() {}
+
+  private static final JsonPointer STREAMS_POINTER = JsonPointer.compile("/streams");
+  private static final JsonPointer STARTUP_POSITION_POINTER =
+      JsonPointer.compile("/startupPosition");
+  private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER =
+      JsonPointer.compile("/clientConfigProperties");
+
+  private static final class Streams {
+    private static final JsonPointer NAME_POINTER = JsonPointer.compile("/stream");
+    private static final JsonPointer TYPE_URL_POINTER = JsonPointer.compile("/typeUrl");
+    private static final JsonPointer TARGETS_POINTER = JsonPointer.compile("/targets");
+  }
+
+  private static final class StartupPosition {
+    private static final String EARLIEST_TYPE = "earliest";
+    private static final String LATEST_TYPE = "latest";
+    private static final String DATE_TYPE = "date";
+
+    private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
+    private static final DateTimeFormatter DATE_FORMATTER =
+        DateTimeFormatter.ofPattern(DATE_PATTERN);
+
+    private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+    private static final JsonPointer DATE_POINTER = JsonPointer.compile("/date");
+  }
+
+  static Optional<KinesisIngressStartupPosition> optionalStartupPosition(JsonNode ingressSpecNode) {
+    final JsonNode startupPositionSpecNode = ingressSpecNode.at(STARTUP_POSITION_POINTER);
+    if (startupPositionSpecNode.isMissingNode()) {
+      return Optional.empty();
+    }
+
+    final String type = Selectors.textAt(startupPositionSpecNode, StartupPosition.TYPE_POINTER);
+    switch (type) {
+      case StartupPosition.EARLIEST_TYPE:
+        return Optional.of(KinesisIngressStartupPosition.fromEarliest());
+      case StartupPosition.LATEST_TYPE:
+        return Optional.of(KinesisIngressStartupPosition.fromLatest());
+      case StartupPosition.DATE_TYPE:
+        return Optional.of(
+            KinesisIngressStartupPosition.fromDate(startupDate(startupPositionSpecNode)));
+      default:
+        final List<String> validValues =
+            Arrays.asList(
+                StartupPosition.EARLIEST_TYPE,
+                StartupPosition.LATEST_TYPE,
+                StartupPosition.DATE_TYPE);
+        throw new IllegalArgumentException(
+            "Invalid startup position type: "
+                + type
+                + "; valid values are ["
+                + String.join(", ", validValues)
+                + "]");
+    }
+  }
+
+  static Map<String, String> clientConfigProperties(JsonNode ingressSpecNode) {
+    return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER);
+  }
+
+  static Map<String, RoutingConfig> routableStreams(JsonNode ingressSpecNode) {
+    Map<String, RoutingConfig> routableStreams = new HashMap<>();
+    for (JsonNode routableStreamNode : Selectors.listAt(ingressSpecNode, STREAMS_POINTER)) {
+      final String streamName = Selectors.textAt(routableStreamNode, Streams.NAME_POINTER);
+      final String typeUrl = Selectors.textAt(routableStreamNode, Streams.TYPE_URL_POINTER);
+      final List<TargetFunctionType> targets = parseRoutableTargetFunctionTypes(routableStreamNode);
+
+      routableStreams.put(
+          streamName,
+          RoutingConfig.newBuilder()
+              .setTypeUrl(typeUrl)
+              .addAllTargetFunctionTypes(targets)
+              .build());
+    }
+    return routableStreams;
+  }
+
+  private static List<TargetFunctionType> parseRoutableTargetFunctionTypes(
+      JsonNode routableStreamNode) {
+    final List<TargetFunctionType> targets = new ArrayList<>();
+    for (String namespaceAndName :
+        Selectors.textListAt(routableStreamNode, Streams.TARGETS_POINTER)) {
+      NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName);
+      targets.add(
+          TargetFunctionType.newBuilder()
+              .setNamespace(namespaceNamePair.namespace())
+              .setType(namespaceNamePair.name())
+              .build());
+    }
+    return targets;
+  }
+
+  private static ZonedDateTime startupDate(JsonNode startupPositionSpecNode) {
+    final String dateStr = Selectors.textAt(startupPositionSpecNode, StartupPosition.DATE_POINTER);
+    try {
+      return ZonedDateTime.parse(dateStr, StartupPosition.DATE_FORMATTER);
+    } catch (DateTimeParseException e) {
+      throw new IllegalArgumentException(
+          "Unable to parse date string for startup position: "
+              + dateStr
+              + "; the date should conform to the pattern "
+              + StartupPosition.DATE_PATTERN,
+          e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
new file mode 100644
index 0000000..b8f02d8
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+
+public final class RoutableProtobufKinesisIngressDeserializer
+    implements KinesisIngressDeserializer<Message> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Map<String, RoutingConfig> routingConfigs;
+
+  RoutableProtobufKinesisIngressDeserializer(Map<String, RoutingConfig> routingConfigs) {
+    if (routingConfigs == null || routingConfigs.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Routing config for routable Kinesis ingress cannot be empty.");
+    }
+    this.routingConfigs = routingConfigs;
+  }
+
+  @Override
+  public Message deserialize(IngressRecord ingressRecord) {
+    final String stream = ingressRecord.getStream();
+
+    final RoutingConfig routingConfig = routingConfigs.get(stream);
+    if (routingConfig == null) {
+      throw new IllegalStateException(
+          "Consumed a record from stream [" + stream + "], but no routing config was specified.");
+    }
+
+    return AutoRoutable.newBuilder()
+        .setConfig(routingConfig)
+        .setId(ingressRecord.getPartitionKey())
+        .setPayloadBytes(ByteString.copyFrom(ingressRecord.getData()))
+        .build();
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
new file mode 100644
index 0000000..99776a3
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.clientConfigProperties;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.optionalStartupPosition;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.routableStreams;
+
+import com.google.protobuf.Message;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.kinesis.KinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilderApiExtension;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public final class RoutableProtobufKinesisSourceProvider implements SourceProvider {
+
+  private final KinesisSourceProvider delegateProvider = new KinesisSourceProvider();
+
+  @Override
+  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+    final KinesisIngressSpec<T> kinesisIngressSpec = asKinesisIngressSpec(spec);
+    return delegateProvider.forSpec(kinesisIngressSpec);
+  }
+
+  private static <T> KinesisIngressSpec<T> asKinesisIngressSpec(IngressSpec<T> spec) {
+    if (!(spec instanceof JsonIngressSpec)) {
+      throw new IllegalArgumentException("Wrong type " + spec.type());
+    }
+    JsonIngressSpec<T> casted = (JsonIngressSpec<T>) spec;
+
+    IngressIdentifier<T> id = casted.id();
+    Class<T> producedType = casted.id().producedType();
+    if (!Message.class.isAssignableFrom(producedType)) {
+      throw new IllegalArgumentException(
+          "ProtocolBuffer based Kinesis ingress is only able to produce types that derive from "
+              + Message.class.getName()
+              + " but "
+              + producedType.getName()
+              + " is provided.");
+    }
+
+    JsonNode specJson = casted.specJson();
+
+    KinesisIngressBuilder<T> kinesisIngressBuilder = KinesisIngressBuilder.forIdentifier(id);
+
+    optionalAwsRegion(specJson).ifPresent(kinesisIngressBuilder::withAwsRegion);
+    optionalAwsCredentials(specJson).ifPresent(kinesisIngressBuilder::withAwsCredentials);
+    optionalStartupPosition(specJson).ifPresent(kinesisIngressBuilder::withStartupPosition);
+    clientConfigProperties(specJson)
+        .entrySet()
+        .forEach(
+            entry ->
+                kinesisIngressBuilder.withClientConfigurationProperty(
+                    entry.getKey(), entry.getValue()));
+
+    Map<String, RoutingConfig> routableStreams = routableStreams(specJson);
+    KinesisIngressBuilderApiExtension.withDeserializer(
+        kinesisIngressBuilder, deserializer(routableStreams));
+    kinesisIngressBuilder.withStreams(new ArrayList<>(routableStreams.keySet()));
+
+    return kinesisIngressBuilder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> KinesisIngressDeserializer<T> deserializer(
+      Map<String, RoutingConfig> routingConfig) {
+    // this cast is safe since we've already checked that T is a Message
+    return (KinesisIngressDeserializer<T>)
+        new RoutableProtobufKinesisIngressDeserializer(routingConfig);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
new file mode 100644
index 0000000..bfa7ef9
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+public final class KinesisIngressBuilderApiExtension {
+
+  private KinesisIngressBuilderApiExtension() {}
+
+  public static <T> void withDeserializer(
+      KinesisIngressBuilder<T> kinesisIngressBuilder, KinesisIngressDeserializer<T> deserializer) {
+    kinesisIngressBuilder.withDeserializer(deserializer);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
new file mode 100644
index 0000000..81e6ff1
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.junit.Test;
+
+public class RoutableProtobufKinesisSourceProviderTest {
+
+  @Test
+  public void exampleUsage() {
+    JsonNode ingressDefinition =
+        loadAsJsonFromClassResource(
+            getClass().getClassLoader(), "routable-protobuf-kinesis-ingress.yaml");
+    JsonIngressSpec<?> spec =
+        new JsonIngressSpec<>(
+            PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE,
+            new IngressIdentifier<>(Message.class, "foo", "bar"),
+            ingressDefinition);
+
+    RoutableProtobufKinesisSourceProvider provider = new RoutableProtobufKinesisSourceProvider();
+    SourceFunction<?> source = provider.forSpec(spec);
+
+    assertThat(source, instanceOf(FlinkKinesisConsumer.class));
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
new file mode 100644
index 0000000..95616ee
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ingress:
+  meta:
+    type: statefun.kinesis.io/routable-protobuf-ingress
+    id: com.mycomp.foo/bar
+  spec:
+    awsRegion:
+      type: specific
+      id: us-west-2
+    awsCredentials:
+      type: basic
+      accessKeyId: my_access_key_id
+      secretAccessKey: my_secret_access_key
+    startupPosition:
+      type: earliest
+    streams:
+      - stream: stream-1
+        typeUrl: com.googleapis/com.mycomp.foo.MessageA
+        targets:
+          - com.mycomp.foo/function-1
+          - com.mycomp.foo/function-2
+      - stream: topic-2
+        typeUrl: com.googleapis/com.mycomp.foo.MessageB
+        targets:
+          - com.mycomp.foo/function-2
+    clientConfigProperties:
+      - SocketTimeout: 9999
+      - MaxConnections: 15
diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
new file mode 100644
index 0000000..c6b384e
--- /dev/null
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class PolyglotKinesisIOTypes {
+
+  private PolyglotKinesisIOTypes() {}
+
+  public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE =
+      new IngressType("statefun.kinesis.io", "routable-protobuf-ingress");
+}