You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:08 UTC
[flink-statefun] 01/07: [FLINK-16124] [kinesis] Add Java Kinesis
Ingress SDK classes
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 818fa819bc67716d7b9e41405668a6086c0b5b86
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:01:50 2020 +0800
[FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes
---
.../flink/statefun/sdk/kinesis/KinesisIOTypes.java | 28 ++++
.../statefun/sdk/kinesis/auth/AwsCredentials.java | 132 ++++++++++++++++
.../flink/statefun/sdk/kinesis/auth/AwsRegion.java | 126 ++++++++++++++++
.../sdk/kinesis/ingress/IngressRecord.java | 129 ++++++++++++++++
.../sdk/kinesis/ingress/KinesisIngressBuilder.java | 168 +++++++++++++++++++++
.../ingress/KinesisIngressDeserializer.java | 37 +++++
.../sdk/kinesis/ingress/KinesisIngressSpec.java | 94 ++++++++++++
.../ingress/KinesisIngressStartupPosition.java | 111 ++++++++++++++
.../sdk/kinesis/KinesisIngressBuilderTest.java | 66 ++++++++
tools/maven/spotbugs-exclude.xml | 10 ++
10 files changed, 901 insertions(+)
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
new file mode 100644
index 0000000..4b57b0a
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class KinesisIOTypes {
+
+ private KinesisIOTypes() {}
+
+ public static final IngressType UNIVERSAL_INGRESS_TYPE =
+ new IngressType("statefun.kinesis.io", "universal-ingress");
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
new file mode 100644
index 0000000..0d9f5e9
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+/** AWS credentials to use for connecting to AWS Kinesis. */
+public abstract class AwsCredentials {
+
+ private AwsCredentials() {}
+
+ /** Consults AWS's default provider chain to determine the AWS credentials. */
+ public static AwsCredentials fromDefaultProviderChain() {
+ return DefaultAwsCredentials.INSTANCE;
+ }
+
+ /**
+ * Specifies the AWS credentials directly with provided access key ID and secret access key
+ * strings.
+ */
+ public static AwsCredentials basic(String accessKeyId, String secretAccessKey) {
+ return new BasicAwsCredentials(accessKeyId, secretAccessKey);
+ }
+
+ /** Specifies the AWS credentials using an AWS configuration profile. */
+ public static AwsCredentials profile(String profileName) {
+ return new ProfileAwsCredentials(profileName, null);
+ }
+
+ /**
+ * Specifies the AWS credentials using an AWS configuration profile, along with the profile's
+ * configuration path.
+ */
+ public static AwsCredentials profile(String profileName, String profilePath) {
+ return new ProfileAwsCredentials(profileName, profilePath);
+ }
+
+ /**
+ * Checks whether the credentials is configured to be obtained from AWS's default provider chain.
+ */
+ public boolean isDefault() {
+ return getClass() == DefaultAwsCredentials.class;
+ }
+
+ /**
+ * Checks whether the credentials is specified using directly provided access key ID and secret
+ * access key strings.
+ */
+ public boolean isBasic() {
+ return getClass() == BasicAwsCredentials.class;
+ }
+
+ /** Checks whether the credentials is configured using AWS configuration profiles. */
+ public boolean isProfile() {
+ return getClass() == ProfileAwsCredentials.class;
+ }
+
+ /** Returns this as a {@link BasicAwsCredentials}. */
+ public BasicAwsCredentials asBasic() {
+ if (!isBasic()) {
+ throw new IllegalStateException(
+ "This AWS credential is not defined with basic access key id and secret key.");
+ }
+ return (BasicAwsCredentials) this;
+ }
+
+ /** Returns this as a {@link ProfileAwsCredentials}. */
+ public ProfileAwsCredentials asProfile() {
+ if (!isProfile()) {
+ throw new IllegalStateException(
+ "This AWS credential is not defined with a AWS configuration profile");
+ }
+ return (ProfileAwsCredentials) this;
+ }
+
+ public static final class DefaultAwsCredentials extends AwsCredentials {
+ private static final DefaultAwsCredentials INSTANCE = new DefaultAwsCredentials();
+ }
+
+ public static final class BasicAwsCredentials extends AwsCredentials {
+ private final String accessKeyId;
+ private final String secretAccessKey;
+
+ BasicAwsCredentials(String accessKeyId, String secretAccessKey) {
+ this.accessKeyId = Objects.requireNonNull(accessKeyId);
+ this.secretAccessKey = Objects.requireNonNull(secretAccessKey);
+ }
+
+ public String accessKeyId() {
+ return accessKeyId;
+ }
+
+ public String secretAccessKey() {
+ return secretAccessKey;
+ }
+ }
+
+ public static final class ProfileAwsCredentials extends AwsCredentials {
+ private final String profileName;
+ @Nullable private final String profilePath;
+
+ ProfileAwsCredentials(String profileName, @Nullable String profilePath) {
+ this.profileName = Objects.requireNonNull(profileName);
+ this.profilePath = profilePath;
+ }
+
+ public String name() {
+ return profileName;
+ }
+
+ public Optional<String> path() {
+ return Optional.ofNullable(profilePath);
+ }
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
new file mode 100644
index 0000000..511aeec
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.net.URI;
+import java.util.Objects;
+
+/** AWS region to use for connecting to AWS Kinesis. */
+public abstract class AwsRegion {
+
+ private AwsRegion() {}
+
+ /** Consults AWS's default provider chain to determine the AWS region. */
+ public static AwsRegion fromDefaultProviderChain() {
+ return DefaultAwsRegion.INSTANCE;
+ }
+
+ /** Specifies an AWS region using the region's unique id. */
+ public static AwsRegion ofId(String id) {
+ return new SpecificIdAwsRegion(id);
+ }
+
+ /**
+ * Connects to an AWS region through a non-standard AWS service endpoint. This is typically used
+ * only for development and testing purposes.
+ */
+ public static AwsRegion ofCustomEndpoint(String serviceEndpoint, String regionId) {
+ return new CustomEndpointAwsRegion(serviceEndpoint, regionId);
+ }
+
+ /** Checks whether the region is configured to be obtained from AWS's default provider chain. */
+ public boolean isDefault() {
+ return getClass() == DefaultAwsRegion.class;
+ }
+
+ /** Checks whether the region is specified with the region's unique id. */
+ public boolean isId() {
+ return getClass() == SpecificIdAwsRegion.class;
+ }
+
+ /** Checks whether the region is specified with a custom non-standard AWS service endpoint. */
+ public boolean isCustomEndpoint() {
+ return getClass() == CustomEndpointAwsRegion.class;
+ }
+
+ /** Returns this region as a {@link SpecificIdAwsRegion}. */
+ public SpecificIdAwsRegion asId() {
+ if (!isId()) {
+ throw new IllegalStateException(
+ "This is not an AWS region specified with using the region's unique id.");
+ }
+ return (SpecificIdAwsRegion) this;
+ }
+
+ /** Returns this region as a {@link CustomEndpointAwsRegion}. */
+ public CustomEndpointAwsRegion asCustomEndpoint() {
+ if (!isCustomEndpoint()) {
+ throw new IllegalStateException(
+ "This is not an AWS region specified with a custom endpoint.");
+ }
+ return (CustomEndpointAwsRegion) this;
+ }
+
+ public static final class DefaultAwsRegion extends AwsRegion {
+ private static final DefaultAwsRegion INSTANCE = new DefaultAwsRegion();
+ }
+
+ public static final class SpecificIdAwsRegion extends AwsRegion {
+ private final String regionId;
+
+ SpecificIdAwsRegion(String regionId) {
+ this.regionId = Objects.requireNonNull(regionId);
+ }
+
+ public String id() {
+ return regionId;
+ }
+ }
+
+ public static final class CustomEndpointAwsRegion extends AwsRegion {
+ private final String serviceEndpoint;
+ private final String regionId;
+
+ CustomEndpointAwsRegion(String serviceEndpoint, String regionId) {
+ this.serviceEndpoint = requireValidEndpoint(serviceEndpoint);
+ this.regionId = Objects.requireNonNull(regionId);
+ }
+
+ public String serviceEndpoint() {
+ return serviceEndpoint;
+ }
+
+ public String regionId() {
+ return regionId;
+ }
+
+ private static String requireValidEndpoint(String serviceEndpoint) {
+ Objects.requireNonNull(serviceEndpoint);
+
+ final URI uri = URI.create(serviceEndpoint);
+ if (!uri.getScheme().equalsIgnoreCase("https")) {
+ throw new IllegalArgumentException(
+ "Invalid service endpoint url: "
+ + serviceEndpoint
+ + "; Only custom service endpoints using HTTPS are supported");
+ }
+
+ return serviceEndpoint;
+ }
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
new file mode 100644
index 0000000..cbfc1f7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.Objects;
+
+/** A record consumed from AWS Kinesis. */
+public final class IngressRecord {
+ private final byte[] data;
+ private final String stream;
+ private final String shardId;
+ private final String partitionKey;
+ private final String sequenceNumber;
+ private final long approximateArrivalTimestamp;
+
+ /** @return A builder for a {@link IngressRecord}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private IngressRecord(
+ byte[] data,
+ String stream,
+ String shardId,
+ String partitionKey,
+ String sequenceNumber,
+ long approximateArrivalTimestamp) {
+ this.data = Objects.requireNonNull(data, "data bytes");
+ this.stream = Objects.requireNonNull(stream, "source stream");
+ this.shardId = Objects.requireNonNull(shardId, "source shard id");
+ this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+ this.sequenceNumber = Objects.requireNonNull(sequenceNumber, "sequence number");
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ }
+
+ /** @return consumed data bytes */
+ public byte[] getData() {
+ return data;
+ }
+
+ /** @return source AWS Kinesis stream */
+ public String getStream() {
+ return stream;
+ }
+
+ /** @return source AWS Kinesis stream shard */
+ public String getShardId() {
+ return shardId;
+ }
+
+ /** @return attached partition key */
+ public String getPartitionKey() {
+ return partitionKey;
+ }
+
+ /** @return sequence number of the consumed record */
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
+ */
+ public long getApproximateArrivalTimestamp() {
+ return approximateArrivalTimestamp;
+ }
+
+ /** Builder for {@link IngressRecord}. */
+ public static final class Builder {
+ private byte[] data;
+ private String stream;
+ private String shardId;
+ private String partitionKey;
+ private String sequenceNumber;
+ long approximateArrivalTimestamp;
+
+ private Builder() {}
+
+ public Builder withData(byte[] data) {
+ this.data = data;
+ return this;
+ }
+
+ public Builder withStream(String stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public Builder withShardId(String shardId) {
+ this.shardId = shardId;
+ return this;
+ }
+
+ public Builder withPartitionKey(String partitionKey) {
+ this.partitionKey = partitionKey;
+ return this;
+ }
+
+ public Builder withSequenceNumber(String sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ return this;
+ }
+
+ public Builder withApproximateArrivalTimestamp(long approximateArrivalTimestamp) {
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ return this;
+ }
+
+ public IngressRecord build() {
+ return new IngressRecord(
+ data, stream, shardId, partitionKey, sequenceNumber, approximateArrivalTimestamp);
+ }
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
new file mode 100644
index 0000000..2d71ae6
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link IngressSpec} for consuming data from AWS Kinesis.
+ *
+ * @param <T> The type consumed from AWS Kinesis.
+ */
+public final class KinesisIngressBuilder<T> {
+
+ private final IngressIdentifier<T> id;
+
+ private final List<String> streams = new ArrayList<>();
+ private Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+ private KinesisIngressStartupPosition startupPosition =
+ KinesisIngressStartupPosition.fromLatest();
+ private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+ private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+ private final Properties clientConfigurationProperties = new Properties();
+
+ private KinesisIngressBuilder(IngressIdentifier<T> id) {
+ this.id = Objects.requireNonNull(id);
+ }
+
+ /**
+ * @param id A unique ingress identifier.
+ * @param <T> The type consumed from Kinesis.
+ * @return A new {@link KinesisIngressBuilder}.
+ */
+ public static <T> KinesisIngressBuilder<T> forIdentifier(IngressIdentifier<T> id) {
+ return new KinesisIngressBuilder<>(id);
+ }
+
+ /** @param stream The name of a stream that should be consumed. */
+ public KinesisIngressBuilder<T> withStream(String stream) {
+ this.streams.add(stream);
+ return this;
+ }
+
+ /** @param streams A list of streams that should be consumed. */
+ public KinesisIngressBuilder<T> withStreams(List<String> streams) {
+ this.streams.addAll(streams);
+ return this;
+ }
+
+ /**
+ * @param deserializerClass The deserializer used to convert between Kinesis's byte messages and
+ * Java objects.
+ */
+ public KinesisIngressBuilder<T> withDeserializer(
+ Class<? extends KinesisIngressDeserializer<T>> deserializerClass) {
+ this.deserializerClass = Objects.requireNonNull(deserializerClass);
+ return this;
+ }
+
+ /**
+ * Configures the position that the ingress should start consuming from. By default, the startup
+ * position is {@link KinesisIngressStartupPosition#fromLatest()}.
+ *
+ * <p>Note that this configuration only affects the position when starting the application from a
+ * fresh start. When restoring the application from a savepoint, the ingress will always start
+ * consuming from the position persisted in the savepoint.
+ *
+ * @param startupPosition the position that the Kafka ingress should start consuming from.
+ * @see KinesisIngressStartupPosition
+ */
+ public KinesisIngressBuilder<T> withStartupPosition(
+ KinesisIngressStartupPosition startupPosition) {
+ this.startupPosition = Objects.requireNonNull(startupPosition);
+ return this;
+ }
+
+ /**
+ * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+ *
+ * @param awsRegion The AWS region to connect to.
+ * @see <a
+ * href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+ * Determine the AWS Region from the Environment</a>.
+ * @see AwsRegion
+ */
+ public KinesisIngressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+ this.awsRegion = Objects.requireNonNull(awsRegion);
+ return this;
+ }
+
+ /**
+ * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+ * default provider chain is consulted.
+ *
+ * @param regionName The unique id of the AWS region to connect to.
+ */
+ public KinesisIngressBuilder<T> withAwsRegion(String regionName) {
+ this.awsRegion = AwsRegion.ofId(regionName);
+ return this;
+ }
+
+ /**
+ * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+ *
+ * @param awsCredentials The AWS credentials to use.
+ * @see <a
+ * href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+ * the Default Credential Provider Chain</a>.
+ * @see AwsCredentials
+ */
+ public KinesisIngressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+ this.awsCredentials = Objects.requireNonNull(awsCredentials);
+ return this;
+ }
+
+ /**
+ * Sets a AWS client configuration to be used by the ingress.
+ *
+ * <p>Supported values are properties of AWS's <a
+ * href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+ * For example, to set a value for {@code SOCKET_TIMEOUT}, the property key would be {@code
+ * SocketTimeout}.
+ *
+ * @param key the property to set.
+ * @param value the value for the property.
+ * @see <a
+ * href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+ */
+ public KinesisIngressBuilder<T> withClientConfigurationProperty(String key, String value) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(value);
+ this.clientConfigurationProperties.setProperty(key, value);
+ return this;
+ }
+
+ /** @return A new {@link KinesisIngressSpec}. */
+ public KinesisIngressSpec<T> build() {
+ return new KinesisIngressSpec<>(
+ id,
+ streams,
+ deserializerClass,
+ startupPosition,
+ awsRegion,
+ awsCredentials,
+ clientConfigurationProperties);
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
new file mode 100644
index 0000000..226d0fb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.io.Serializable;
+
+/**
+ * Describes how to deserialize {@link IngressRecord}s consumed from AWS Kinesis into data types
+ * that are processed by the system.
+ *
+ * @param <T> The type created by the ingress deserializer.
+ */
+public interface KinesisIngressDeserializer<T> extends Serializable {
+
+ /**
+ * Deserialize an input value from a {@link IngressRecord} consumed from AWS Kinesis.
+ *
+ * @param ingressRecord the {@link IngressRecord} consumed from AWS Kinesis.
+ * @return the deserialized data object.
+ */
+ T deserialize(IngressRecord ingressRecord);
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
new file mode 100644
index 0000000..9bc4a9f
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisIngressSpec<T> implements IngressSpec<T> {
+ private final IngressIdentifier<T> ingressIdentifier;
+ private final List<String> streams;
+ private final Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+ private final KinesisIngressStartupPosition startupPosition;
+ private final AwsRegion awsRegion;
+ private final AwsCredentials awsCredentials;
+ private final Properties clientConfigurationProperties;
+
+ KinesisIngressSpec(
+ IngressIdentifier<T> ingressIdentifier,
+ List<String> streams,
+ Class<? extends KinesisIngressDeserializer<T>> deserializerClass,
+ KinesisIngressStartupPosition startupPosition,
+ AwsRegion awsRegion,
+ AwsCredentials awsCredentials,
+ Properties clientConfigurationProperties) {
+ this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier");
+ this.deserializerClass = Objects.requireNonNull(deserializerClass, "deserializer class");
+ this.startupPosition = Objects.requireNonNull(startupPosition, "startup position");
+ this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration");
+ this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration");
+ this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+
+ this.streams = Objects.requireNonNull(streams, "AWS Kinesis stream names");
+ if (streams.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Must have at least one stream to consume from specified.");
+ }
+ }
+
+ @Override
+ public IngressIdentifier<T> id() {
+ return ingressIdentifier;
+ }
+
+ @Override
+ public IngressType type() {
+ return KinesisIOTypes.UNIVERSAL_INGRESS_TYPE;
+ }
+
+ public List<String> streams() {
+ return streams;
+ }
+
+ public Class<? extends KinesisIngressDeserializer<T>> deserializerClass() {
+ return deserializerClass;
+ }
+
+ public KinesisIngressStartupPosition startupPosition() {
+ return startupPosition;
+ }
+
+ public AwsRegion awsRegion() {
+ return awsRegion;
+ }
+
+ public AwsCredentials awsCredentials() {
+ return awsCredentials;
+ }
+
+ public Properties clientConfigurationProperties() {
+ return clientConfigurationProperties;
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
new file mode 100644
index 0000000..58aec1e
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.time.ZonedDateTime;
+
+/** Position for the ingress to start consuming AWS Kinesis shards. */
+public abstract class KinesisIngressStartupPosition {
+
+ private KinesisIngressStartupPosition() {}
+
+ /** Start consuming from the earliest position possible. */
+ public static KinesisIngressStartupPosition fromEarliest() {
+ return EarliestPosition.INSTANCE;
+ }
+
+ /** Start consuming from the latest position, i.e. head of the stream shards. */
+ public static KinesisIngressStartupPosition fromLatest() {
+ return LatestPosition.INSTANCE;
+ }
+
+ /**
+ * Start consuming from position with ingestion timestamps after or equal to a specified {@link
+ * ZonedDateTime}.
+ */
+ public static KinesisIngressStartupPosition fromDate(ZonedDateTime date) {
+ return new DatePosition(date);
+ }
+
+ /** Checks whether this position is configured using the earliest position. */
+ public final boolean isEarliest() {
+ return getClass() == EarliestPosition.class;
+ }
+
+ /** Checks whether this position is configured using the latest position. */
+ public final boolean isLatest() {
+ return getClass() == LatestPosition.class;
+ }
+
+ /** Checks whether this position is configured using a date. */
+ public final boolean isDate() {
+ return getClass() == DatePosition.class;
+ }
+
+ /** Returns this position as a {@link DatePosition}. */
+ public final DatePosition asDate() {
+ if (!isDate()) {
+ throw new IllegalStateException("This is not a startup position configured using a date.");
+ }
+ return (DatePosition) this;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public static final class EarliestPosition extends KinesisIngressStartupPosition {
+ private static final EarliestPosition INSTANCE = new EarliestPosition();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public static final class LatestPosition extends KinesisIngressStartupPosition {
+ private static final LatestPosition INSTANCE = new LatestPosition();
+ }
+
+ public static final class DatePosition extends KinesisIngressStartupPosition {
+
+ private final ZonedDateTime date;
+
+ private DatePosition(ZonedDateTime date) {
+ this.date = date;
+ }
+
+ public ZonedDateTime date() {
+ return date;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof DatePosition)) {
+ return false;
+ }
+
+ DatePosition that = (DatePosition) obj;
+ return that.date.equals(date);
+ }
+
+ @Override
+ public int hashCode() {
+ return date.hashCode();
+ }
+ }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
new file mode 100644
index 0000000..bd19b48
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.junit.Test;
+
+public class KinesisIngressBuilderTest {
+
+ private static final IngressIdentifier<String> ID =
+ new IngressIdentifier<>(String.class, "namespace", "name");
+
+ private static final String STREAM_NAME = "test-stream";
+
+ @Test
+ public void exampleUsage() {
+ final KinesisIngressSpec<String> kinesisIngressSpec =
+ KinesisIngressBuilder.forIdentifier(ID)
+ .withDeserializer(TestDeserializer.class)
+ .withStream(STREAM_NAME)
+ .build();
+
+ assertThat(kinesisIngressSpec.id(), is(ID));
+ assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME)));
+ assertTrue(kinesisIngressSpec.awsRegion().isDefault());
+ assertTrue(kinesisIngressSpec.awsCredentials().isDefault());
+ assertEquals(TestDeserializer.class, kinesisIngressSpec.deserializerClass());
+ assertTrue(kinesisIngressSpec.startupPosition().isLatest());
+ assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty());
+ }
+
+ private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String deserialize(IngressRecord ingressRecord) {
+ return null;
+ }
+ }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 69887fc..9bd3076 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -65,6 +65,16 @@ under the License.
<Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
</Match>
+ <!-- These classes allow setting / getting fields with modifiable types without a defensive copy -->
+ <Match>
+ <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
<!-- 3rd party -->
<Match>
<Class name="~it\.unimi\.dsi\.fastutil\.objects\.ObjectOpenHashMap"/>