You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/22 07:52:29 UTC
[flink-statefun] 01/11: [FLINK-16124] [kinesis] Implement runtime
RoutableProtobufKinesisSourceProvider
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 41c900e16efa1bbe29990701ce0be03a6da08df8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:44:23 2020 +0800
[FLINK-16124] [kinesis] Implement runtime RoutableProtobufKinesisSourceProvider
---
.../flink/io/kinesis/KinesisSourceProvider.java | 2 +-
.../io/kinesis/polyglot/AwsAuthSpecJsonParser.java | 130 ++++++++++++++++++
.../polyglot/KinesisIngressSpecJsonParser.java | 146 +++++++++++++++++++++
...RoutableProtobufKinesisIngressDeserializer.java | 59 +++++++++
.../RoutableProtobufKinesisSourceProvider.java | 98 ++++++++++++++
.../ingress/KinesisIngressBuilderApiExtension.java | 28 ++++
.../RoutableProtobufKinesisSourceProviderTest.java | 52 ++++++++
.../routable-protobuf-kinesis-ingress.yaml | 42 ++++++
.../flink/io/kinesis/PolyglotKinesisIOTypes.java | 29 ++++
9 files changed, 585 insertions(+), 1 deletion(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index 1a0ab09..93e322e 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-final class KinesisSourceProvider implements SourceProvider {
+public final class KinesisSourceProvider implements SourceProvider {
@Override
public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
new file mode 100644
index 0000000..47c26e4
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+final class AwsAuthSpecJsonParser {
+
+ private AwsAuthSpecJsonParser() {}
+
+ private static final JsonPointer AWS_REGION_POINTER = JsonPointer.compile("/awsRegion");
+ private static final JsonPointer AWS_CREDENTIALS_POINTER = JsonPointer.compile("/awsCredentials");
+
+ private static final class Region {
+ private static final String DEFAULT_TYPE = "default";
+ private static final String SPECIFIED_ID_TYPE = "specific";
+ private static final String CUSTOM_ENDPOINT_TYPE = "custom-endpoint";
+
+ private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+ private static final JsonPointer ID_POINTER = JsonPointer.compile("/id");
+ private static final JsonPointer ENDPOINT_POINTER = JsonPointer.compile("/endpoint");
+ }
+
+ private static final class Credentials {
+ private static final String DEFAULT_TYPE = "default";
+ private static final String BASIC_TYPE = "basic";
+ private static final String PROFILE_TYPE = "profile";
+
+ private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+ private static final JsonPointer ACCESS_KEY_ID_POINTER = JsonPointer.compile("/accessKeyId");
+ private static final JsonPointer SECRET_ACCESS_KEY_POINTER =
+ JsonPointer.compile("/secretAccessKey");
+ private static final JsonPointer PROFILE_NAME_POINTER = JsonPointer.compile("/profileName");
+ private static final JsonPointer PROFILE_PATH_POINTER = JsonPointer.compile("/profilePath");
+ }
+
+ static Optional<AwsRegion> optionalAwsRegion(JsonNode specNode) {
+ final JsonNode awsRegionSpecNode = specNode.at(AWS_REGION_POINTER);
+ if (awsRegionSpecNode.isMissingNode()) {
+ return Optional.empty();
+ }
+
+ final String type = Selectors.textAt(awsRegionSpecNode, Region.TYPE_POINTER);
+ switch (type) {
+ case Region.DEFAULT_TYPE:
+ return Optional.of(AwsRegion.fromDefaultProviderChain());
+ case Region.SPECIFIED_ID_TYPE:
+ return Optional.of(AwsRegion.ofId(Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+ case Region.CUSTOM_ENDPOINT_TYPE:
+ return Optional.of(
+ AwsRegion.ofCustomEndpoint(
+ Selectors.textAt(awsRegionSpecNode, Region.ENDPOINT_POINTER),
+ Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER)));
+ default:
+ final List<String> validValues =
+ Arrays.asList(
+ Region.DEFAULT_TYPE, Region.SPECIFIED_ID_TYPE, Region.CUSTOM_ENDPOINT_TYPE);
+ throw new IllegalArgumentException(
+ "Invalid AWS region type: "
+ + type
+ + "; valid values are ["
+ + String.join(", ", validValues)
+ + "]");
+ }
+ }
+
+ static Optional<AwsCredentials> optionalAwsCredentials(JsonNode specNode) {
+ final JsonNode awsCredentialsSpecNode = specNode.at(AWS_CREDENTIALS_POINTER);
+ if (awsCredentialsSpecNode.isMissingNode()) {
+ return Optional.empty();
+ }
+
+ final String type = Selectors.textAt(awsCredentialsSpecNode, Credentials.TYPE_POINTER);
+ switch (type) {
+ case Credentials.DEFAULT_TYPE:
+ return Optional.of(AwsCredentials.fromDefaultProviderChain());
+ case Credentials.BASIC_TYPE:
+ return Optional.of(
+ AwsCredentials.basic(
+ Selectors.textAt(awsCredentialsSpecNode, Credentials.ACCESS_KEY_ID_POINTER),
+ Selectors.textAt(awsCredentialsSpecNode, Credentials.SECRET_ACCESS_KEY_POINTER)));
+ case Credentials.PROFILE_TYPE:
+ final Optional<String> path =
+ Selectors.optionalTextAt(awsCredentialsSpecNode, Credentials.PROFILE_PATH_POINTER);
+ if (path.isPresent()) {
+ return Optional.of(
+ AwsCredentials.profile(
+ Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER),
+ path.get()));
+ } else {
+ return Optional.of(
+ AwsCredentials.profile(
+ Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER)));
+ }
+ default:
+ final List<String> validValues =
+ Arrays.asList(
+ Credentials.DEFAULT_TYPE, Credentials.BASIC_TYPE, Credentials.PROFILE_TYPE);
+ throw new IllegalArgumentException(
+ "Invalid AWS credential type: "
+ + type
+ + "; valid values are ["
+ + String.join(", ", validValues)
+ + "]");
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
new file mode 100644
index 0000000..823cdfb
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+
+final class KinesisIngressSpecJsonParser {
+
+ private KinesisIngressSpecJsonParser() {}
+
+ private static final JsonPointer STREAMS_POINTER = JsonPointer.compile("/streams");
+ private static final JsonPointer STARTUP_POSITION_POINTER =
+ JsonPointer.compile("/startupPosition");
+ private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER =
+ JsonPointer.compile("/clientConfigProperties");
+
+ private static final class Streams {
+ private static final JsonPointer NAME_POINTER = JsonPointer.compile("/stream");
+ private static final JsonPointer TYPE_URL_POINTER = JsonPointer.compile("/typeUrl");
+ private static final JsonPointer TARGETS_POINTER = JsonPointer.compile("/targets");
+ }
+
+ private static final class StartupPosition {
+ private static final String EARLIEST_TYPE = "earliest";
+ private static final String LATEST_TYPE = "latest";
+ private static final String DATE_TYPE = "date";
+
+ private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
+ private static final DateTimeFormatter DATE_FORMATTER =
+ DateTimeFormatter.ofPattern(DATE_PATTERN);
+
+ private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type");
+ private static final JsonPointer DATE_POINTER = JsonPointer.compile("/date");
+ }
+
+ static Optional<KinesisIngressStartupPosition> optionalStartupPosition(JsonNode ingressSpecNode) {
+ final JsonNode startupPositionSpecNode = ingressSpecNode.at(STARTUP_POSITION_POINTER);
+ if (startupPositionSpecNode.isMissingNode()) {
+ return Optional.empty();
+ }
+
+ final String type = Selectors.textAt(startupPositionSpecNode, StartupPosition.TYPE_POINTER);
+ switch (type) {
+ case StartupPosition.EARLIEST_TYPE:
+ return Optional.of(KinesisIngressStartupPosition.fromEarliest());
+ case StartupPosition.LATEST_TYPE:
+ return Optional.of(KinesisIngressStartupPosition.fromLatest());
+ case StartupPosition.DATE_TYPE:
+ return Optional.of(
+ KinesisIngressStartupPosition.fromDate(startupDate(startupPositionSpecNode)));
+ default:
+ final List<String> validValues =
+ Arrays.asList(
+ StartupPosition.EARLIEST_TYPE,
+ StartupPosition.LATEST_TYPE,
+ StartupPosition.DATE_TYPE);
+ throw new IllegalArgumentException(
+ "Invalid startup position type: "
+ + type
+ + "; valid values are ["
+ + String.join(", ", validValues)
+ + "]");
+ }
+ }
+
+ static Map<String, String> clientConfigProperties(JsonNode ingressSpecNode) {
+ return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER);
+ }
+
+ static Map<String, RoutingConfig> routableStreams(JsonNode ingressSpecNode) {
+ Map<String, RoutingConfig> routableStreams = new HashMap<>();
+ for (JsonNode routableStreamNode : Selectors.listAt(ingressSpecNode, STREAMS_POINTER)) {
+ final String streamName = Selectors.textAt(routableStreamNode, Streams.NAME_POINTER);
+ final String typeUrl = Selectors.textAt(routableStreamNode, Streams.TYPE_URL_POINTER);
+ final List<TargetFunctionType> targets = parseRoutableTargetFunctionTypes(routableStreamNode);
+
+ routableStreams.put(
+ streamName,
+ RoutingConfig.newBuilder()
+ .setTypeUrl(typeUrl)
+ .addAllTargetFunctionTypes(targets)
+ .build());
+ }
+ return routableStreams;
+ }
+
+ private static List<TargetFunctionType> parseRoutableTargetFunctionTypes(
+ JsonNode routableStreamNode) {
+ final List<TargetFunctionType> targets = new ArrayList<>();
+ for (String namespaceAndName :
+ Selectors.textListAt(routableStreamNode, Streams.TARGETS_POINTER)) {
+ NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName);
+ targets.add(
+ TargetFunctionType.newBuilder()
+ .setNamespace(namespaceNamePair.namespace())
+ .setType(namespaceNamePair.name())
+ .build());
+ }
+ return targets;
+ }
+
+ private static ZonedDateTime startupDate(JsonNode startupPositionSpecNode) {
+ final String dateStr = Selectors.textAt(startupPositionSpecNode, StartupPosition.DATE_POINTER);
+ try {
+ return ZonedDateTime.parse(dateStr, StartupPosition.DATE_FORMATTER);
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException(
+ "Unable to parse date string for startup position: "
+ + dateStr
+ + "; the date should conform to the pattern "
+ + StartupPosition.DATE_PATTERN,
+ e);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
new file mode 100644
index 0000000..b8f02d8
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+
+public final class RoutableProtobufKinesisIngressDeserializer
+ implements KinesisIngressDeserializer<Message> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, RoutingConfig> routingConfigs;
+
+ RoutableProtobufKinesisIngressDeserializer(Map<String, RoutingConfig> routingConfigs) {
+ if (routingConfigs == null || routingConfigs.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Routing config for routable Kinesis ingress cannot be empty.");
+ }
+ this.routingConfigs = routingConfigs;
+ }
+
+ @Override
+ public Message deserialize(IngressRecord ingressRecord) {
+ final String stream = ingressRecord.getStream();
+
+ final RoutingConfig routingConfig = routingConfigs.get(stream);
+ if (routingConfig == null) {
+ throw new IllegalStateException(
+ "Consumed a record from stream [" + stream + "], but no routing config was specified.");
+ }
+
+ return AutoRoutable.newBuilder()
+ .setConfig(routingConfig)
+ .setId(ingressRecord.getPartitionKey())
+ .setPayloadBytes(ByteString.copyFrom(ingressRecord.getData()))
+ .build();
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
new file mode 100644
index 0000000..99776a3
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis.polyglot;
+
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.clientConfigProperties;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.optionalStartupPosition;
+import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.routableStreams;
+
+import com.google.protobuf.Message;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.kinesis.KinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilderApiExtension;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public final class RoutableProtobufKinesisSourceProvider implements SourceProvider {
+
+ private final KinesisSourceProvider delegateProvider = new KinesisSourceProvider();
+
+ @Override
+ public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+ final KinesisIngressSpec<T> kinesisIngressSpec = asKinesisIngressSpec(spec);
+ return delegateProvider.forSpec(kinesisIngressSpec);
+ }
+
+ private static <T> KinesisIngressSpec<T> asKinesisIngressSpec(IngressSpec<T> spec) {
+ if (!(spec instanceof JsonIngressSpec)) {
+ throw new IllegalArgumentException("Wrong type " + spec.type());
+ }
+ JsonIngressSpec<T> casted = (JsonIngressSpec<T>) spec;
+
+ IngressIdentifier<T> id = casted.id();
+ Class<T> producedType = casted.id().producedType();
+ if (!Message.class.isAssignableFrom(producedType)) {
+ throw new IllegalArgumentException(
+ "ProtocolBuffer based Kinesis ingress is only able to produce types that derive from "
+ + Message.class.getName()
+ + " but "
+ + producedType.getName()
+ + " is provided.");
+ }
+
+ JsonNode specJson = casted.specJson();
+
+ KinesisIngressBuilder<T> kinesisIngressBuilder = KinesisIngressBuilder.forIdentifier(id);
+
+ optionalAwsRegion(specJson).ifPresent(kinesisIngressBuilder::withAwsRegion);
+ optionalAwsCredentials(specJson).ifPresent(kinesisIngressBuilder::withAwsCredentials);
+ optionalStartupPosition(specJson).ifPresent(kinesisIngressBuilder::withStartupPosition);
+ clientConfigProperties(specJson)
+ .entrySet()
+ .forEach(
+ entry ->
+ kinesisIngressBuilder.withClientConfigurationProperty(
+ entry.getKey(), entry.getValue()));
+
+ Map<String, RoutingConfig> routableStreams = routableStreams(specJson);
+ KinesisIngressBuilderApiExtension.withDeserializer(
+ kinesisIngressBuilder, deserializer(routableStreams));
+ kinesisIngressBuilder.withStreams(new ArrayList<>(routableStreams.keySet()));
+
+ return kinesisIngressBuilder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> KinesisIngressDeserializer<T> deserializer(
+ Map<String, RoutingConfig> routingConfig) {
+ // this cast is safe since we've already checked that T is a Message
+ return (KinesisIngressDeserializer<T>)
+ new RoutableProtobufKinesisIngressDeserializer(routingConfig);
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
new file mode 100644
index 0000000..bfa7ef9
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+public final class KinesisIngressBuilderApiExtension {
+
+ private KinesisIngressBuilderApiExtension() {}
+
+ public static <T> void withDeserializer(
+ KinesisIngressBuilder<T> kinesisIngressBuilder, KinesisIngressDeserializer<T> deserializer) {
+ kinesisIngressBuilder.withDeserializer(deserializer);
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
new file mode 100644
index 0000000..81e6ff1
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.junit.Test;
+
+public class RoutableProtobufKinesisSourceProviderTest {
+
+ @Test
+ public void exampleUsage() {
+ JsonNode ingressDefinition =
+ loadAsJsonFromClassResource(
+ getClass().getClassLoader(), "routable-protobuf-kinesis-ingress.yaml");
+ JsonIngressSpec<?> spec =
+ new JsonIngressSpec<>(
+ PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE,
+ new IngressIdentifier<>(Message.class, "foo", "bar"),
+ ingressDefinition);
+
+ RoutableProtobufKinesisSourceProvider provider = new RoutableProtobufKinesisSourceProvider();
+ SourceFunction<?> source = provider.forSpec(spec);
+
+ assertThat(source, instanceOf(FlinkKinesisConsumer.class));
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
new file mode 100644
index 0000000..95616ee
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ingress:
+ meta:
+ type: statefun.kinesis.io/routable-protobuf-ingress
+ id: com.mycomp.foo/bar
+ spec:
+ awsRegion:
+ type: specific
+ id: us-west-2
+ awsCredentials:
+ type: basic
+ accessKeyId: my_access_key_id
+ secretAccessKey: my_secret_access_key
+ startupPosition:
+ type: earliest
+ streams:
+ - stream: stream-1
+ typeUrl: com.googleapis/com.mycomp.foo.MessageA
+ targets:
+ - com.mycomp.foo/function-1
+ - com.mycomp.foo/function-2
+ - stream: topic-2
+ typeUrl: com.googleapis/com.mycomp.foo.MessageB
+ targets:
+ - com.mycomp.foo/function-2
+ clientConfigProperties:
+ - SocketTimeout: 9999
+ - MaxConnections: 15
diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
new file mode 100644
index 0000000..c6b384e
--- /dev/null
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class PolyglotKinesisIOTypes {
+
+ private PolyglotKinesisIOTypes() {}
+
+ public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE =
+ new IngressType("statefun.kinesis.io", "routable-protobuf-ingress");
+}