You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:08 UTC

[flink-statefun] 01/07: [FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 818fa819bc67716d7b9e41405668a6086c0b5b86
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:01:50 2020 +0800

    [FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes
---
 .../flink/statefun/sdk/kinesis/KinesisIOTypes.java |  28 ++++
 .../statefun/sdk/kinesis/auth/AwsCredentials.java  | 132 ++++++++++++++++
 .../flink/statefun/sdk/kinesis/auth/AwsRegion.java | 126 ++++++++++++++++
 .../sdk/kinesis/ingress/IngressRecord.java         | 129 ++++++++++++++++
 .../sdk/kinesis/ingress/KinesisIngressBuilder.java | 168 +++++++++++++++++++++
 .../ingress/KinesisIngressDeserializer.java        |  37 +++++
 .../sdk/kinesis/ingress/KinesisIngressSpec.java    |  94 ++++++++++++
 .../ingress/KinesisIngressStartupPosition.java     | 111 ++++++++++++++
 .../sdk/kinesis/KinesisIngressBuilderTest.java     |  66 ++++++++
 tools/maven/spotbugs-exclude.xml                   |  10 ++
 10 files changed, 901 insertions(+)

diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
new file mode 100644
index 0000000..4b57b0a
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class KinesisIOTypes {
+
+  private KinesisIOTypes() {}
+
+  public static final IngressType UNIVERSAL_INGRESS_TYPE =
+      new IngressType("statefun.kinesis.io", "universal-ingress");
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
new file mode 100644
index 0000000..0d9f5e9
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+/** AWS credentials to use for connecting to AWS Kinesis. */
+public abstract class AwsCredentials {
+
+  private AwsCredentials() {}
+
+  /** Consults AWS's default provider chain to determine the AWS credentials. */
+  public static AwsCredentials fromDefaultProviderChain() {
+    return DefaultAwsCredentials.INSTANCE;
+  }
+
+  /**
+   * Specifies the AWS credentials directly with provided access key ID and secret access key
+   * strings.
+   */
+  public static AwsCredentials basic(String accessKeyId, String secretAccessKey) {
+    return new BasicAwsCredentials(accessKeyId, secretAccessKey);
+  }
+
+  /** Specifies the AWS credentials using an AWS configuration profile. */
+  public static AwsCredentials profile(String profileName) {
+    return new ProfileAwsCredentials(profileName, null);
+  }
+
+  /**
+   * Specifies the AWS credentials using an AWS configuration profile, along with the profile's
+   * configuration path.
+   */
+  public static AwsCredentials profile(String profileName, String profilePath) {
+    return new ProfileAwsCredentials(profileName, profilePath);
+  }
+
+  /**
+   * Checks whether the credentials is configured to be obtained from AWS's default provider chain.
+   */
+  public boolean isDefault() {
+    return getClass() == DefaultAwsCredentials.class;
+  }
+
+  /**
+   * Checks whether the credentials is specified using directly provided access key ID and secret
+   * access key strings.
+   */
+  public boolean isBasic() {
+    return getClass() == BasicAwsCredentials.class;
+  }
+
+  /** Checks whether the credentials is configured using AWS configuration profiles. */
+  public boolean isProfile() {
+    return getClass() == ProfileAwsCredentials.class;
+  }
+
+  /** Returns this as a {@link BasicAwsCredentials}. */
+  public BasicAwsCredentials asBasic() {
+    if (!isBasic()) {
+      throw new IllegalStateException(
+          "This AWS credential is not defined with basic access key id and secret key.");
+    }
+    return (BasicAwsCredentials) this;
+  }
+
+  /** Returns this as a {@link ProfileAwsCredentials}. */
+  public ProfileAwsCredentials asProfile() {
+    if (!isProfile()) {
+      throw new IllegalStateException(
+          "This AWS credential is not defined with a AWS configuration profile");
+    }
+    return (ProfileAwsCredentials) this;
+  }
+
+  public static final class DefaultAwsCredentials extends AwsCredentials {
+    private static final DefaultAwsCredentials INSTANCE = new DefaultAwsCredentials();
+  }
+
+  public static final class BasicAwsCredentials extends AwsCredentials {
+    private final String accessKeyId;
+    private final String secretAccessKey;
+
+    BasicAwsCredentials(String accessKeyId, String secretAccessKey) {
+      this.accessKeyId = Objects.requireNonNull(accessKeyId);
+      this.secretAccessKey = Objects.requireNonNull(secretAccessKey);
+    }
+
+    public String accessKeyId() {
+      return accessKeyId;
+    }
+
+    public String secretAccessKey() {
+      return secretAccessKey;
+    }
+  }
+
+  public static final class ProfileAwsCredentials extends AwsCredentials {
+    private final String profileName;
+    @Nullable private final String profilePath;
+
+    ProfileAwsCredentials(String profileName, @Nullable String profilePath) {
+      this.profileName = Objects.requireNonNull(profileName);
+      this.profilePath = profilePath;
+    }
+
+    public String name() {
+      return profileName;
+    }
+
+    public Optional<String> path() {
+      return Optional.ofNullable(profilePath);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
new file mode 100644
index 0000000..511aeec
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.net.URI;
+import java.util.Objects;
+
+/** AWS region to use for connecting to AWS Kinesis. */
+public abstract class AwsRegion {
+
+  private AwsRegion() {}
+
+  /** Consults AWS's default provider chain to determine the AWS region. */
+  public static AwsRegion fromDefaultProviderChain() {
+    return DefaultAwsRegion.INSTANCE;
+  }
+
+  /** Specifies an AWS region using the region's unique id. */
+  public static AwsRegion ofId(String id) {
+    return new SpecificIdAwsRegion(id);
+  }
+
+  /**
+   * Connects to an AWS region through a non-standard AWS service endpoint. This is typically used
+   * only for development and testing purposes.
+   */
+  public static AwsRegion ofCustomEndpoint(String serviceEndpoint, String regionId) {
+    return new CustomEndpointAwsRegion(serviceEndpoint, regionId);
+  }
+
+  /** Checks whether the region is configured to be obtained from AWS's default provider chain. */
+  public boolean isDefault() {
+    return getClass() == DefaultAwsRegion.class;
+  }
+
+  /** Checks whether the region is specified with the region's unique id. */
+  public boolean isId() {
+    return getClass() == SpecificIdAwsRegion.class;
+  }
+
+  /** Checks whether the region is specified with a custom non-standard AWS service endpoint. */
+  public boolean isCustomEndpoint() {
+    return getClass() == CustomEndpointAwsRegion.class;
+  }
+
+  /** Returns this region as a {@link SpecificIdAwsRegion}. */
+  public SpecificIdAwsRegion asId() {
+    if (!isId()) {
+      throw new IllegalStateException(
+          "This is not an AWS region specified with using the region's unique id.");
+    }
+    return (SpecificIdAwsRegion) this;
+  }
+
+  /** Returns this region as a {@link CustomEndpointAwsRegion}. */
+  public CustomEndpointAwsRegion asCustomEndpoint() {
+    if (!isCustomEndpoint()) {
+      throw new IllegalStateException(
+          "This is not an AWS region specified with a custom endpoint.");
+    }
+    return (CustomEndpointAwsRegion) this;
+  }
+
+  public static final class DefaultAwsRegion extends AwsRegion {
+    private static final DefaultAwsRegion INSTANCE = new DefaultAwsRegion();
+  }
+
+  public static final class SpecificIdAwsRegion extends AwsRegion {
+    private final String regionId;
+
+    SpecificIdAwsRegion(String regionId) {
+      this.regionId = Objects.requireNonNull(regionId);
+    }
+
+    public String id() {
+      return regionId;
+    }
+  }
+
+  public static final class CustomEndpointAwsRegion extends AwsRegion {
+    private final String serviceEndpoint;
+    private final String regionId;
+
+    CustomEndpointAwsRegion(String serviceEndpoint, String regionId) {
+      this.serviceEndpoint = requireValidEndpoint(serviceEndpoint);
+      this.regionId = Objects.requireNonNull(regionId);
+    }
+
+    public String serviceEndpoint() {
+      return serviceEndpoint;
+    }
+
+    public String regionId() {
+      return regionId;
+    }
+
+    private static String requireValidEndpoint(String serviceEndpoint) {
+      Objects.requireNonNull(serviceEndpoint);
+
+      final URI uri = URI.create(serviceEndpoint);
+      if (!uri.getScheme().equalsIgnoreCase("https")) {
+        throw new IllegalArgumentException(
+            "Invalid service endpoint url: "
+                + serviceEndpoint
+                + "; Only custom service endpoints using HTTPS are supported");
+      }
+
+      return serviceEndpoint;
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
new file mode 100644
index 0000000..cbfc1f7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.Objects;
+
+/** A record consumed from AWS Kinesis. */
+public final class IngressRecord {
+  private final byte[] data;
+  private final String stream;
+  private final String shardId;
+  private final String partitionKey;
+  private final String sequenceNumber;
+  private final long approximateArrivalTimestamp;
+
+  /** @return A builder for a {@link IngressRecord}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private IngressRecord(
+      byte[] data,
+      String stream,
+      String shardId,
+      String partitionKey,
+      String sequenceNumber,
+      long approximateArrivalTimestamp) {
+    this.data = Objects.requireNonNull(data, "data bytes");
+    this.stream = Objects.requireNonNull(stream, "source stream");
+    this.shardId = Objects.requireNonNull(shardId, "source shard id");
+    this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+    this.sequenceNumber = Objects.requireNonNull(sequenceNumber, "sequence number");
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+  }
+
+  /** @return consumed data bytes */
+  public byte[] getData() {
+    return data;
+  }
+
+  /** @return source AWS Kinesis stream */
+  public String getStream() {
+    return stream;
+  }
+
+  /** @return source AWS Kinesis stream shard */
+  public String getShardId() {
+    return shardId;
+  }
+
+  /** @return attached partition key */
+  public String getPartitionKey() {
+    return partitionKey;
+  }
+
+  /** @return sequence number of the consumed record */
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  /**
+   * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
+   */
+  public long getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  /** Builder for {@link IngressRecord}. */
+  public static final class Builder {
+    private byte[] data;
+    private String stream;
+    private String shardId;
+    private String partitionKey;
+    private String sequenceNumber;
+    long approximateArrivalTimestamp;
+
+    private Builder() {}
+
+    public Builder withData(byte[] data) {
+      this.data = data;
+      return this;
+    }
+
+    public Builder withStream(String stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public Builder withShardId(String shardId) {
+      this.shardId = shardId;
+      return this;
+    }
+
+    public Builder withPartitionKey(String partitionKey) {
+      this.partitionKey = partitionKey;
+      return this;
+    }
+
+    public Builder withSequenceNumber(String sequenceNumber) {
+      this.sequenceNumber = sequenceNumber;
+      return this;
+    }
+
+    public Builder withApproximateArrivalTimestamp(long approximateArrivalTimestamp) {
+      this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+      return this;
+    }
+
+    public IngressRecord build() {
+      return new IngressRecord(
+          data, stream, shardId, partitionKey, sequenceNumber, approximateArrivalTimestamp);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
new file mode 100644
index 0000000..2d71ae6
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link IngressSpec} for consuming data from AWS Kinesis.
+ *
+ * @param <T> The type consumed from AWS Kinesis.
+ */
+public final class KinesisIngressBuilder<T> {
+
+  private final IngressIdentifier<T> id;
+
+  private final List<String> streams = new ArrayList<>();
+  private Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+  private KinesisIngressStartupPosition startupPosition =
+      KinesisIngressStartupPosition.fromLatest();
+  private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+  private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+  private final Properties clientConfigurationProperties = new Properties();
+
+  private KinesisIngressBuilder(IngressIdentifier<T> id) {
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * @param id A unique ingress identifier.
+   * @param <T> The type consumed from Kinesis.
+   * @return A new {@link KinesisIngressBuilder}.
+   */
+  public static <T> KinesisIngressBuilder<T> forIdentifier(IngressIdentifier<T> id) {
+    return new KinesisIngressBuilder<>(id);
+  }
+
+  /** @param stream The name of a stream that should be consumed. */
+  public KinesisIngressBuilder<T> withStream(String stream) {
+    this.streams.add(stream);
+    return this;
+  }
+
+  /** @param streams A list of streams that should be consumed. */
+  public KinesisIngressBuilder<T> withStreams(List<String> streams) {
+    this.streams.addAll(streams);
+    return this;
+  }
+
+  /**
+   * @param deserializerClass The deserializer used to convert between Kinesis's byte messages and
+   *     Java objects.
+   */
+  public KinesisIngressBuilder<T> withDeserializer(
+      Class<? extends KinesisIngressDeserializer<T>> deserializerClass) {
+    this.deserializerClass = Objects.requireNonNull(deserializerClass);
+    return this;
+  }
+
+  /**
+   * Configures the position that the ingress should start consuming from. By default, the startup
+   * position is {@link KinesisIngressStartupPosition#fromLatest()}.
+   *
+   * <p>Note that this configuration only affects the position when starting the application from a
+   * fresh start. When restoring the application from a savepoint, the ingress will always start
+   * consuming from the position persisted in the savepoint.
+   *
+   * @param startupPosition the position that the Kafka ingress should start consuming from.
+   * @see KinesisIngressStartupPosition
+   */
+  public KinesisIngressBuilder<T> withStartupPosition(
+      KinesisIngressStartupPosition startupPosition) {
+    this.startupPosition = Objects.requireNonNull(startupPosition);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsRegion The AWS region to connect to.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+   *     Determine the AWS Region from the Environment</a>.
+   * @see AwsRegion
+   */
+  public KinesisIngressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+   * default provider chain is consulted.
+   *
+   * @param regionName The unique id of the AWS region to connect to.
+   */
+  public KinesisIngressBuilder<T> withAwsRegion(String regionName) {
+    this.awsRegion = AwsRegion.ofId(regionName);
+    return this;
+  }
+
+  /**
+   * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsCredentials The AWS credentials to use.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+   *     the Default Credential Provider Chain</a>.
+   * @see AwsCredentials
+   */
+  public KinesisIngressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    return this;
+  }
+
+  /**
+   * Sets a AWS client configuration to be used by the ingress.
+   *
+   * <p>Supported values are properties of AWS's <a
+   * href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   * For example, to set a value for {@code SOCKET_TIMEOUT}, the property key would be {@code
+   * SocketTimeout}.
+   *
+   * @param key the property to set.
+   * @param value the value for the property.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   */
+  public KinesisIngressBuilder<T> withClientConfigurationProperty(String key, String value) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    this.clientConfigurationProperties.setProperty(key, value);
+    return this;
+  }
+
+  /** @return A new {@link KinesisIngressSpec}. */
+  public KinesisIngressSpec<T> build() {
+    return new KinesisIngressSpec<>(
+        id,
+        streams,
+        deserializerClass,
+        startupPosition,
+        awsRegion,
+        awsCredentials,
+        clientConfigurationProperties);
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
new file mode 100644
index 0000000..226d0fb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.io.Serializable;
+
+/**
+ * Describes how to deserialize {@link IngressRecord}s consumed from AWS Kinesis into data types
+ * that are processed by the system.
+ *
+ * @param <T> The type created by the ingress deserializer.
+ */
+public interface KinesisIngressDeserializer<T> extends Serializable {
+
+  /**
+   * Deserialize an input value from a {@link IngressRecord} consumed from AWS Kinesis.
+   *
+   * @param ingressRecord the {@link IngressRecord} consumed from AWS Kinesis.
+   * @return the deserialized data object.
+   */
+  T deserialize(IngressRecord ingressRecord);
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
new file mode 100644
index 0000000..9bc4a9f
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisIngressSpec<T> implements IngressSpec<T> {
+  private final IngressIdentifier<T> ingressIdentifier;
+  private final List<String> streams;
+  private final Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+  private final KinesisIngressStartupPosition startupPosition;
+  private final AwsRegion awsRegion;
+  private final AwsCredentials awsCredentials;
+  private final Properties clientConfigurationProperties;
+
+  KinesisIngressSpec(
+      IngressIdentifier<T> ingressIdentifier,
+      List<String> streams,
+      Class<? extends KinesisIngressDeserializer<T>> deserializerClass,
+      KinesisIngressStartupPosition startupPosition,
+      AwsRegion awsRegion,
+      AwsCredentials awsCredentials,
+      Properties clientConfigurationProperties) {
+    this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier");
+    this.deserializerClass = Objects.requireNonNull(deserializerClass, "deserializer class");
+    this.startupPosition = Objects.requireNonNull(startupPosition, "startup position");
+    this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration");
+    this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration");
+    this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+
+    this.streams = Objects.requireNonNull(streams, "AWS Kinesis stream names");
+    if (streams.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Must have at least one stream to consume from specified.");
+    }
+  }
+
+  @Override
+  public IngressIdentifier<T> id() {
+    return ingressIdentifier;
+  }
+
+  @Override
+  public IngressType type() {
+    return KinesisIOTypes.UNIVERSAL_INGRESS_TYPE;
+  }
+
+  public List<String> streams() {
+    return streams;
+  }
+
+  public Class<? extends KinesisIngressDeserializer<T>> deserializerClass() {
+    return deserializerClass;
+  }
+
+  public KinesisIngressStartupPosition startupPosition() {
+    return startupPosition;
+  }
+
+  public AwsRegion awsRegion() {
+    return awsRegion;
+  }
+
+  public AwsCredentials awsCredentials() {
+    return awsCredentials;
+  }
+
+  public Properties clientConfigurationProperties() {
+    return clientConfigurationProperties;
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
new file mode 100644
index 0000000..58aec1e
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.time.ZonedDateTime;
+
+/** Position for the ingress to start consuming AWS Kinesis shards. */
+public abstract class KinesisIngressStartupPosition {
+
+  private KinesisIngressStartupPosition() {}
+
+  /** Start consuming from the earliest position possible. */
+  public static KinesisIngressStartupPosition fromEarliest() {
+    return EarliestPosition.INSTANCE;
+  }
+
+  /** Start consuming from the latest position, i.e. head of the stream shards. */
+  public static KinesisIngressStartupPosition fromLatest() {
+    return LatestPosition.INSTANCE;
+  }
+
+  /**
+   * Start consuming from position with ingestion timestamps after or equal to a specified {@link
+   * ZonedDateTime}.
+   */
+  public static KinesisIngressStartupPosition fromDate(ZonedDateTime date) {
+    return new DatePosition(date);
+  }
+
+  /** Checks whether this position is configured using the earliest position. */
+  public final boolean isEarliest() {
+    return getClass() == EarliestPosition.class;
+  }
+
+  /** Checks whether this position is configured using the latest position. */
+  public final boolean isLatest() {
+    return getClass() == LatestPosition.class;
+  }
+
+  /** Checks whether this position is configured using a date. */
+  public final boolean isDate() {
+    return getClass() == DatePosition.class;
+  }
+
+  /** Returns this position as a {@link DatePosition}. */
+  public final DatePosition asDate() {
+    if (!isDate()) {
+      throw new IllegalStateException("This is not a startup position configured using a date.");
+    }
+    return (DatePosition) this;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static final class EarliestPosition extends KinesisIngressStartupPosition {
+    private static final EarliestPosition INSTANCE = new EarliestPosition();
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static final class LatestPosition extends KinesisIngressStartupPosition {
+    private static final LatestPosition INSTANCE = new LatestPosition();
+  }
+
+  public static final class DatePosition extends KinesisIngressStartupPosition {
+
+    private final ZonedDateTime date;
+
+    private DatePosition(ZonedDateTime date) {
+      this.date = date;
+    }
+
+    public ZonedDateTime date() {
+      return date;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof DatePosition)) {
+        return false;
+      }
+
+      DatePosition that = (DatePosition) obj;
+      return that.date.equals(date);
+    }
+
+    @Override
+    public int hashCode() {
+      return date.hashCode();
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
new file mode 100644
index 0000000..bd19b48
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.junit.Test;
+
+public class KinesisIngressBuilderTest {
+
+  private static final IngressIdentifier<String> ID =
+      new IngressIdentifier<>(String.class, "namespace", "name");
+
+  private static final String STREAM_NAME = "test-stream";
+
+  @Test
+  public void exampleUsage() {
+    final KinesisIngressSpec<String> kinesisIngressSpec =
+        KinesisIngressBuilder.forIdentifier(ID)
+            .withDeserializer(TestDeserializer.class)
+            .withStream(STREAM_NAME)
+            .build();
+
+    assertThat(kinesisIngressSpec.id(), is(ID));
+    assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME)));
+    assertTrue(kinesisIngressSpec.awsRegion().isDefault());
+    assertTrue(kinesisIngressSpec.awsCredentials().isDefault());
+    assertEquals(TestDeserializer.class, kinesisIngressSpec.deserializerClass());
+    assertTrue(kinesisIngressSpec.startupPosition().isLatest());
+    assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty());
+  }
+
+  private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String deserialize(IngressRecord ingressRecord) {
+      return null;
+    }
+  }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 69887fc..9bd3076 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -65,6 +65,16 @@ under the License.
         <Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
     </Match>
 
+    <!-- These classes allow setting / getting fields with modifiable types without a defensive copy -->
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+
     <!-- 3rd party -->
     <Match>
         <Class name="~it\.unimi\.dsi\.fastutil\.objects\.ObjectOpenHashMap"/>