You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:07 UTC

[flink-statefun] branch master updated (39f9916 -> 4ac1dba)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 39f9916  [FLINK-16603] Fix quickstart Maven archetype
     add b7a3802  [FLINK-16124] [kinesis] Add statefun-kinesis-io module
     new 818fa81  [FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes
     new b471831  [FLINK-16124] [kinesis] Add Kinesis IO dependencies to statefun-flink-io-bundle
     new 2cd0544  [FLINK-16124] [kinesis] Implement runtime KinesisSourceProvider
     new 059a9e8  [FLINK-16124] [kinesis] Bind KinesisIngressProvider to Java Kinesis Ingress Type
     new f482309  [FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes
     new 2668eb1  [FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider
     new 4ac1dba  [FLINK-16124] [kinesis] Bind KinesisSinkProvider to Java Kinesis Egress Type

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   1 +
 statefun-flink/statefun-flink-io-bundle/pom.xml    |  18 ++
 .../flink/io/kinesis/AwsAuthConfigProperties.java  | 121 ++++++++++++++
 .../CachingPartitionerSerializerDelegate.java      |  85 ++++++++++
 .../KinesisDeserializationSchemaDelegate.java      |  63 +++++++
 .../flink/io/kinesis/KinesisFlinkIOModule.java     |  13 +-
 .../flink/io/kinesis/KinesisSinkProvider.java      |  73 ++++++++
 .../flink/io/kinesis/KinesisSourceProvider.java    | 121 ++++++++++++++
 .../io/kinesis/AwsAuthConfigPropertiesTest.java    | 185 +++++++++++++++++++++
 .../CachingPartitionerSerializerDelegateTest.java  | 104 ++++++++++++
 .../flink/io/kinesis/KinesisSinkProviderTest.java  |  62 +++++++
 .../io/kinesis/KinesisSourceProviderTest.java      |  66 ++++++++
 {statefun-testutil => statefun-kinesis-io}/pom.xml |  12 +-
 .../flink/statefun/sdk/kinesis/KinesisIOTypes.java |  17 +-
 .../statefun/sdk/kinesis/auth/AwsCredentials.java  | 132 +++++++++++++++
 .../flink/statefun/sdk/kinesis/auth/AwsRegion.java | 126 ++++++++++++++
 .../statefun/sdk/kinesis/egress/EgressRecord.java  |  98 +++++++++++
 .../sdk/kinesis/egress/KinesisEgressBuilder.java   | 149 +++++++++++++++++
 .../kinesis/egress/KinesisEgressSerializer.java    |  20 +--
 .../sdk/kinesis/egress/KinesisEgressSpec.java      |  81 +++++++++
 .../sdk/kinesis/ingress/IngressRecord.java         | 129 ++++++++++++++
 .../sdk/kinesis/ingress/KinesisIngressBuilder.java | 168 +++++++++++++++++++
 .../ingress/KinesisIngressDeserializer.java        |  20 +--
 .../sdk/kinesis/ingress/KinesisIngressSpec.java    |  94 +++++++++++
 .../ingress/KinesisIngressStartupPosition.java     | 111 +++++++++++++
 .../sdk/kinesis/KinesisEgressBuilderTest.java      |  58 +++++++
 .../sdk/kinesis/KinesisIngressBuilderTest.java     |  66 ++++++++
 tools/maven/spotbugs-exclude.xml                   |  18 ++
 28 files changed, 2165 insertions(+), 46 deletions(-)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisDeserializationSchemaDelegate.java
 copy statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/custom/flink/MyFlinkIoModule.java => statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java (68%)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProviderTest.java
 copy {statefun-testutil => statefun-kinesis-io}/pom.xml (86%)
 copy statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/FlinkIoModule.java => statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java (71%)
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
 copy statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/BootstrapDataRouterProvider.java => statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java (62%)
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
 copy statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/BootstrapDataRouterProvider.java => statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java (60%)
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
 create mode 100644 statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
 create mode 100644 statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
 create mode 100644 statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java


[flink-statefun] 02/07: [FLINK-16124] [kinesis] Add Kinesis IO dependencies to statefun-flink-io-bundle

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b47183131b57504432e10c389798a89d95364ed2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:03:55 2020 +0800

    [FLINK-16124] [kinesis] Add Kinesis IO dependencies to statefun-flink-io-bundle
---
 statefun-flink/statefun-flink-io-bundle/pom.xml | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/pom.xml b/statefun-flink/statefun-flink-io-bundle/pom.xml
index 4762f30..a62f41d 100644
--- a/statefun-flink/statefun-flink-io-bundle/pom.xml
+++ b/statefun-flink/statefun-flink-io-bundle/pom.xml
@@ -56,6 +56,11 @@ under the License.
             <artifactId>statefun-kafka-io</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-kinesis-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <!-- flink  -->
         <dependency>
@@ -71,6 +76,13 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <!-- flink kinesis connector -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
         <!-- 3rd party -->
         <dependency>
             <groupId>com.google.protobuf</groupId>


[flink-statefun] 05/07: [FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f4823097e11c45c9c289100bb6e046f215aff7e8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:05 2020 +0800

    [FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes
---
 .../flink/statefun/sdk/kinesis/KinesisIOTypes.java |   3 +
 .../statefun/sdk/kinesis/egress/EgressRecord.java  |  98 ++++++++++++++
 .../sdk/kinesis/egress/KinesisEgressBuilder.java   | 149 +++++++++++++++++++++
 .../KinesisEgressSerializer.java}                  |  23 +++-
 .../sdk/kinesis/egress/KinesisEgressSpec.java      |  81 +++++++++++
 .../sdk/kinesis/KinesisEgressBuilderTest.java      |  58 ++++++++
 tools/maven/spotbugs-exclude.xml                   |   8 ++
 7 files changed, 413 insertions(+), 7 deletions(-)

diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
index 4b57b0a..346451e 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.sdk.kinesis;
 
+import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.IngressType;
 
 public final class KinesisIOTypes {
@@ -25,4 +26,6 @@ public final class KinesisIOTypes {
 
   public static final IngressType UNIVERSAL_INGRESS_TYPE =
       new IngressType("statefun.kinesis.io", "universal-ingress");
+  public static final EgressType UNIVERSAL_EGRESS_TYPE =
+      new EgressType("statefun.kinesis.io", "universal-egress");
 }
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
new file mode 100644
index 0000000..ac8c5bb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/** A record to be written to AWS Kinesis. */
+public final class EgressRecord {
+
+  private final byte[] data;
+  private final String stream;
+  private final String partitionKey;
+  @Nullable private final String explicitHashKey;
+
+  /** @return A builder for a {@link EgressRecord}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private EgressRecord(
+      byte[] data, String stream, String partitionKey, @Nullable String explicitHashKey) {
+    this.data = Objects.requireNonNull(data, "data bytes");
+    this.stream = Objects.requireNonNull(stream, "target stream");
+    this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+    this.explicitHashKey = explicitHashKey;
+  }
+
+  /** @return data bytes to write */
+  public byte[] getData() {
+    return data;
+  }
+
+  /** @return target AWS Kinesis stream to write to. */
+  public String getStream() {
+    return stream;
+  }
+
+  /** @return partition key to use when writing the record to AWS Kinesis. */
+  public String getPartitionKey() {
+    return partitionKey;
+  }
+
+  /** @return explicit hash key to use when writing the record to AWS Kinesis. */
+  @Nullable
+  public String getExplicitHashKey() {
+    return explicitHashKey;
+  }
+
+  /** Builder for {@link EgressRecord}. */
+  public static final class Builder {
+    private byte[] data;
+    private String stream;
+    private String partitionKey;
+    private String explicitHashKey;
+
+    private Builder() {}
+
+    public Builder withData(byte[] data) {
+      this.data = data;
+      return this;
+    }
+
+    public Builder withStream(String stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public Builder withPartitionKey(String partitionKey) {
+      this.partitionKey = partitionKey;
+      return this;
+    }
+
+    public Builder withExplicitHashKey(String explicitHashKey) {
+      this.explicitHashKey = explicitHashKey;
+      return this;
+    }
+
+    public EgressRecord build() {
+      return new EgressRecord(data, stream, partitionKey, explicitHashKey);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
new file mode 100644
index 0000000..98406c7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link EgressSpec} for writing data to AWS Kinesis.
+ *
+ * @param <T> The type written to AWS Kinesis.
+ */
+public final class KinesisEgressBuilder<T> {
+
+  private final EgressIdentifier<T> id;
+
+  private Class<? extends KinesisEgressSerializer<T>> serializerClass;
+  private int maxOutstandingRecords = 1000;
+  private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+  private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+  private final Properties clientConfigurationProperties = new Properties();
+
+  private KinesisEgressBuilder(EgressIdentifier<T> id) {
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * @param id A unique egress identifier.
+   * @param <T> The type consumed from Kinesis.
+   * @return A new {@link KinesisEgressBuilder}.
+   */
+  public static <T> KinesisEgressBuilder<T> forIdentifier(EgressIdentifier<T> id) {
+    return new KinesisEgressBuilder<>(id);
+  }
+
+  /**
+   * @param serializerClass The serializer used to convert from Java objects to Kinesis's byte
+   *     messages.
+   */
+  public KinesisEgressBuilder<T> withSerializer(
+      Class<? extends KinesisEgressSerializer<T>> serializerClass) {
+    this.serializerClass = Objects.requireNonNull(serializerClass);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsRegion The AWS region to connect to.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+   *     Determine the AWS Region from the Environment</a>.
+   * @see AwsRegion
+   */
+  public KinesisEgressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+   * default provider chain is consulted.
+   *
+   * @param regionName The unique id of the AWS region to connect to.
+   */
+  public KinesisEgressBuilder<T> withAwsRegion(String regionName) {
+    this.awsRegion = AwsRegion.ofId(regionName);
+    return this;
+  }
+
+  /**
+   * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsCredentials The AWS credentials to use.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+   *     the Default Credential Provider Chain</a>.
+   * @see AwsCredentials
+   */
+  public KinesisEgressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    return this;
+  }
+
+  /**
+   * The maximum number of buffered outstanding records, before backpressure is applied by the
+   * egress.
+   *
+   * @param maxOutstandingRecords the maximum number of buffered outstanding records
+   */
+  public KinesisEgressBuilder<T> withMaxOutstandingRecords(int maxOutstandingRecords) {
+    if (maxOutstandingRecords <= 0) {
+      throw new IllegalArgumentException("Max outstanding records must be larger than 0.");
+    }
+    this.maxOutstandingRecords = maxOutstandingRecords;
+    return this;
+  }
+
+  /**
+   * Sets a AWS client configuration to be used by the egress.
+   *
+   * <p>Supported values are properties of AWS's <a
+   * href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/latest/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">ccom.amazonaws.services.kinesis.producer.KinesisProducerConfiguration</a>.
+   * Please see <a
+   * href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">Default
+   * Configuration Properties</a> for a full list of the keys.
+   *
+   * @param key the property to set.
+   * @param value the value for the property.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   */
+  public KinesisEgressBuilder<T> withClientConfigurationProperty(String key, String value) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    this.clientConfigurationProperties.setProperty(key, value);
+    return this;
+  }
+
+  /** @return A new {@link KinesisEgressSpec}. */
+  public KinesisEgressSpec<T> build() {
+    return new KinesisEgressSpec<>(
+        id,
+        serializerClass,
+        maxOutstandingRecords,
+        awsRegion,
+        awsCredentials,
+        clientConfigurationProperties);
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
similarity index 58%
copy from statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
copy to statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
index 4b57b0a..46f605d 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
@@ -15,14 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.sdk.kinesis;
+package org.apache.flink.statefun.sdk.kinesis.egress;
 
-import org.apache.flink.statefun.sdk.IngressType;
+import java.io.Serializable;
 
-public final class KinesisIOTypes {
-
-  private KinesisIOTypes() {}
+/**
+ * Defines how to serialize values of type {@code T} into {@link EgressRecord}s to be written to AWS
+ * Kinesis.
+ *
+ * @param <T> the type of values being written.
+ */
+public interface KinesisEgressSerializer<T> extends Serializable {
 
-  public static final IngressType UNIVERSAL_INGRESS_TYPE =
-      new IngressType("statefun.kinesis.io", "universal-ingress");
+  /**
+   * Serialize an output value into a {@link EgressRecord} to be written to AWS Kinesis.
+   *
+   * @param value the output value to write.
+   * @return a {@link EgressRecord} to be written to AWS Kinesis.
+   */
+  EgressRecord serialize(T value);
 }
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
new file mode 100644
index 0000000..7f6b369
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisEgressSpec<T> implements EgressSpec<T> {
+  private final EgressIdentifier<T> egressIdentifier;
+  private final Class<? extends KinesisEgressSerializer<T>> serializerClass;
+  private final int maxOutstandingRecords;
+  private final AwsRegion awsRegion;
+  private final AwsCredentials awsCredentials;
+  private final Properties clientConfigurationProperties;
+
+  KinesisEgressSpec(
+      EgressIdentifier<T> egressIdentifier,
+      Class<? extends KinesisEgressSerializer<T>> serializerClass,
+      int maxOutstandingRecords,
+      AwsRegion awsRegion,
+      AwsCredentials awsCredentials,
+      Properties clientConfigurationProperties) {
+    this.egressIdentifier = Objects.requireNonNull(egressIdentifier);
+    this.serializerClass = Objects.requireNonNull(serializerClass);
+    this.maxOutstandingRecords = maxOutstandingRecords;
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+  }
+
+  @Override
+  public EgressIdentifier<T> id() {
+    return egressIdentifier;
+  }
+
+  @Override
+  public EgressType type() {
+    return KinesisIOTypes.UNIVERSAL_EGRESS_TYPE;
+  }
+
+  public Class<? extends KinesisEgressSerializer<T>> serializerClass() {
+    return serializerClass;
+  }
+
+  public int maxOutstandingRecords() {
+    return maxOutstandingRecords;
+  }
+
+  public AwsRegion awsRegion() {
+    return awsRegion;
+  }
+
+  public AwsCredentials awsCredentials() {
+    return awsCredentials;
+  }
+
+  public Properties clientConfigurationProperties() {
+    return clientConfigurationProperties;
+  }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
new file mode 100644
index 0000000..e4406c4
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.junit.Test;
+
+public class KinesisEgressBuilderTest {
+
+  private static final EgressIdentifier<String> ID =
+      new EgressIdentifier<>("namespace", "name", String.class);
+
+  @Test
+  public void exampleUsage() {
+    final KinesisEgressSpec<String> kinesisEgressSpec =
+        KinesisEgressBuilder.forIdentifier(ID).withSerializer(TestSerializer.class).build();
+
+    assertThat(kinesisEgressSpec.id(), is(ID));
+    assertTrue(kinesisEgressSpec.awsRegion().isDefault());
+    assertTrue(kinesisEgressSpec.awsCredentials().isDefault());
+    assertEquals(TestSerializer.class, kinesisEgressSpec.serializerClass());
+    assertTrue(kinesisEgressSpec.clientConfigurationProperties().isEmpty());
+  }
+
+  private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      return null;
+    }
+  }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 9bd3076..694e52d 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -74,6 +74,14 @@ under the License.
         <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
         <Bug pattern="EI_EXPOSE_REP2"/>
     </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord\$Builder"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
 
     <!-- 3rd party -->
     <Match>


[flink-statefun] 01/07: [FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 818fa819bc67716d7b9e41405668a6086c0b5b86
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:01:50 2020 +0800

    [FLINK-16124] [kinesis] Add Java Kinesis Ingress SDK classes
---
 .../flink/statefun/sdk/kinesis/KinesisIOTypes.java |  28 ++++
 .../statefun/sdk/kinesis/auth/AwsCredentials.java  | 132 ++++++++++++++++
 .../flink/statefun/sdk/kinesis/auth/AwsRegion.java | 126 ++++++++++++++++
 .../sdk/kinesis/ingress/IngressRecord.java         | 129 ++++++++++++++++
 .../sdk/kinesis/ingress/KinesisIngressBuilder.java | 168 +++++++++++++++++++++
 .../ingress/KinesisIngressDeserializer.java        |  37 +++++
 .../sdk/kinesis/ingress/KinesisIngressSpec.java    |  94 ++++++++++++
 .../ingress/KinesisIngressStartupPosition.java     | 111 ++++++++++++++
 .../sdk/kinesis/KinesisIngressBuilderTest.java     |  66 ++++++++
 tools/maven/spotbugs-exclude.xml                   |  10 ++
 10 files changed, 901 insertions(+)

diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
new file mode 100644
index 0000000..4b57b0a
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import org.apache.flink.statefun.sdk.IngressType;
+
+public final class KinesisIOTypes {
+
+  private KinesisIOTypes() {}
+
+  public static final IngressType UNIVERSAL_INGRESS_TYPE =
+      new IngressType("statefun.kinesis.io", "universal-ingress");
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
new file mode 100644
index 0000000..0d9f5e9
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsCredentials.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+/** AWS credentials to use for connecting to AWS Kinesis. */
+public abstract class AwsCredentials {
+
+  private AwsCredentials() {}
+
+  /** Consults AWS's default provider chain to determine the AWS credentials. */
+  public static AwsCredentials fromDefaultProviderChain() {
+    return DefaultAwsCredentials.INSTANCE;
+  }
+
+  /**
+   * Specifies the AWS credentials directly with provided access key ID and secret access key
+   * strings.
+   */
+  public static AwsCredentials basic(String accessKeyId, String secretAccessKey) {
+    return new BasicAwsCredentials(accessKeyId, secretAccessKey);
+  }
+
+  /** Specifies the AWS credentials using an AWS configuration profile. */
+  public static AwsCredentials profile(String profileName) {
+    return new ProfileAwsCredentials(profileName, null);
+  }
+
+  /**
+   * Specifies the AWS credentials using an AWS configuration profile, along with the profile's
+   * configuration path.
+   */
+  public static AwsCredentials profile(String profileName, String profilePath) {
+    return new ProfileAwsCredentials(profileName, profilePath);
+  }
+
+  /**
+   * Checks whether the credentials is configured to be obtained from AWS's default provider chain.
+   */
+  public boolean isDefault() {
+    return getClass() == DefaultAwsCredentials.class;
+  }
+
+  /**
+   * Checks whether the credentials is specified using directly provided access key ID and secret
+   * access key strings.
+   */
+  public boolean isBasic() {
+    return getClass() == BasicAwsCredentials.class;
+  }
+
+  /** Checks whether the credentials is configured using AWS configuration profiles. */
+  public boolean isProfile() {
+    return getClass() == ProfileAwsCredentials.class;
+  }
+
+  /** Returns this as a {@link BasicAwsCredentials}. */
+  public BasicAwsCredentials asBasic() {
+    if (!isBasic()) {
+      throw new IllegalStateException(
+          "This AWS credential is not defined with basic access key id and secret key.");
+    }
+    return (BasicAwsCredentials) this;
+  }
+
+  /** Returns this as a {@link ProfileAwsCredentials}. */
+  public ProfileAwsCredentials asProfile() {
+    if (!isProfile()) {
+      throw new IllegalStateException(
+          "This AWS credential is not defined with a AWS configuration profile");
+    }
+    return (ProfileAwsCredentials) this;
+  }
+
+  public static final class DefaultAwsCredentials extends AwsCredentials {
+    private static final DefaultAwsCredentials INSTANCE = new DefaultAwsCredentials();
+  }
+
+  public static final class BasicAwsCredentials extends AwsCredentials {
+    private final String accessKeyId;
+    private final String secretAccessKey;
+
+    BasicAwsCredentials(String accessKeyId, String secretAccessKey) {
+      this.accessKeyId = Objects.requireNonNull(accessKeyId);
+      this.secretAccessKey = Objects.requireNonNull(secretAccessKey);
+    }
+
+    public String accessKeyId() {
+      return accessKeyId;
+    }
+
+    public String secretAccessKey() {
+      return secretAccessKey;
+    }
+  }
+
+  public static final class ProfileAwsCredentials extends AwsCredentials {
+    private final String profileName;
+    @Nullable private final String profilePath;
+
+    ProfileAwsCredentials(String profileName, @Nullable String profilePath) {
+      this.profileName = Objects.requireNonNull(profileName);
+      this.profilePath = profilePath;
+    }
+
+    public String name() {
+      return profileName;
+    }
+
+    public Optional<String> path() {
+      return Optional.ofNullable(profilePath);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
new file mode 100644
index 0000000..511aeec
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/auth/AwsRegion.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.auth;
+
+import java.net.URI;
+import java.util.Objects;
+
+/** AWS region to use for connecting to AWS Kinesis. */
+public abstract class AwsRegion {
+
+  private AwsRegion() {}
+
+  /** Consults AWS's default provider chain to determine the AWS region. */
+  public static AwsRegion fromDefaultProviderChain() {
+    return DefaultAwsRegion.INSTANCE;
+  }
+
+  /** Specifies an AWS region using the region's unique id. */
+  public static AwsRegion ofId(String id) {
+    return new SpecificIdAwsRegion(id);
+  }
+
+  /**
+   * Connects to an AWS region through a non-standard AWS service endpoint. This is typically used
+   * only for development and testing purposes.
+   */
+  public static AwsRegion ofCustomEndpoint(String serviceEndpoint, String regionId) {
+    return new CustomEndpointAwsRegion(serviceEndpoint, regionId);
+  }
+
+  /** Checks whether the region is configured to be obtained from AWS's default provider chain. */
+  public boolean isDefault() {
+    return getClass() == DefaultAwsRegion.class;
+  }
+
+  /** Checks whether the region is specified with the region's unique id. */
+  public boolean isId() {
+    return getClass() == SpecificIdAwsRegion.class;
+  }
+
+  /** Checks whether the region is specified with a custom non-standard AWS service endpoint. */
+  public boolean isCustomEndpoint() {
+    return getClass() == CustomEndpointAwsRegion.class;
+  }
+
+  /** Returns this region as a {@link SpecificIdAwsRegion}. */
+  public SpecificIdAwsRegion asId() {
+    if (!isId()) {
+      throw new IllegalStateException(
+          "This is not an AWS region specified with using the region's unique id.");
+    }
+    return (SpecificIdAwsRegion) this;
+  }
+
+  /** Returns this region as a {@link CustomEndpointAwsRegion}. */
+  public CustomEndpointAwsRegion asCustomEndpoint() {
+    if (!isCustomEndpoint()) {
+      throw new IllegalStateException(
+          "This is not an AWS region specified with a custom endpoint.");
+    }
+    return (CustomEndpointAwsRegion) this;
+  }
+
+  public static final class DefaultAwsRegion extends AwsRegion {
+    private static final DefaultAwsRegion INSTANCE = new DefaultAwsRegion();
+  }
+
+  public static final class SpecificIdAwsRegion extends AwsRegion {
+    private final String regionId;
+
+    SpecificIdAwsRegion(String regionId) {
+      this.regionId = Objects.requireNonNull(regionId);
+    }
+
+    public String id() {
+      return regionId;
+    }
+  }
+
+  public static final class CustomEndpointAwsRegion extends AwsRegion {
+    private final String serviceEndpoint;
+    private final String regionId;
+
+    CustomEndpointAwsRegion(String serviceEndpoint, String regionId) {
+      this.serviceEndpoint = requireValidEndpoint(serviceEndpoint);
+      this.regionId = Objects.requireNonNull(regionId);
+    }
+
+    public String serviceEndpoint() {
+      return serviceEndpoint;
+    }
+
+    public String regionId() {
+      return regionId;
+    }
+
+    private static String requireValidEndpoint(String serviceEndpoint) {
+      Objects.requireNonNull(serviceEndpoint);
+
+      final URI uri = URI.create(serviceEndpoint);
+      if (!uri.getScheme().equalsIgnoreCase("https")) {
+        throw new IllegalArgumentException(
+            "Invalid service endpoint url: "
+                + serviceEndpoint
+                + "; Only custom service endpoints using HTTPS are supported");
+      }
+
+      return serviceEndpoint;
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
new file mode 100644
index 0000000..cbfc1f7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/IngressRecord.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.Objects;
+
+/** A record consumed from AWS Kinesis. */
+public final class IngressRecord {
+  private final byte[] data;
+  private final String stream;
+  private final String shardId;
+  private final String partitionKey;
+  private final String sequenceNumber;
+  private final long approximateArrivalTimestamp;
+
+  /** @return A builder for a {@link IngressRecord}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private IngressRecord(
+      byte[] data,
+      String stream,
+      String shardId,
+      String partitionKey,
+      String sequenceNumber,
+      long approximateArrivalTimestamp) {
+    this.data = Objects.requireNonNull(data, "data bytes");
+    this.stream = Objects.requireNonNull(stream, "source stream");
+    this.shardId = Objects.requireNonNull(shardId, "source shard id");
+    this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+    this.sequenceNumber = Objects.requireNonNull(sequenceNumber, "sequence number");
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+  }
+
+  /** @return consumed data bytes */
+  public byte[] getData() {
+    return data;
+  }
+
+  /** @return source AWS Kinesis stream */
+  public String getStream() {
+    return stream;
+  }
+
+  /** @return source AWS Kinesis stream shard */
+  public String getShardId() {
+    return shardId;
+  }
+
+  /** @return attached partition key */
+  public String getPartitionKey() {
+    return partitionKey;
+  }
+
+  /** @return sequence number of the consumed record */
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  /**
+   * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
+   */
+  public long getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  /** Builder for {@link IngressRecord}. */
+  public static final class Builder {
+    private byte[] data;
+    private String stream;
+    private String shardId;
+    private String partitionKey;
+    private String sequenceNumber;
+    long approximateArrivalTimestamp;
+
+    private Builder() {}
+
+    public Builder withData(byte[] data) {
+      this.data = data;
+      return this;
+    }
+
+    public Builder withStream(String stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public Builder withShardId(String shardId) {
+      this.shardId = shardId;
+      return this;
+    }
+
+    public Builder withPartitionKey(String partitionKey) {
+      this.partitionKey = partitionKey;
+      return this;
+    }
+
+    public Builder withSequenceNumber(String sequenceNumber) {
+      this.sequenceNumber = sequenceNumber;
+      return this;
+    }
+
+    public Builder withApproximateArrivalTimestamp(long approximateArrivalTimestamp) {
+      this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+      return this;
+    }
+
+    public IngressRecord build() {
+      return new IngressRecord(
+          data, stream, shardId, partitionKey, sequenceNumber, approximateArrivalTimestamp);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
new file mode 100644
index 0000000..2d71ae6
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link IngressSpec} for consuming data from AWS Kinesis.
+ *
+ * @param <T> The type consumed from AWS Kinesis.
+ */
+public final class KinesisIngressBuilder<T> {
+
+  private final IngressIdentifier<T> id;
+
+  private final List<String> streams = new ArrayList<>();
+  private Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+  private KinesisIngressStartupPosition startupPosition =
+      KinesisIngressStartupPosition.fromLatest();
+  private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+  private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+  private final Properties clientConfigurationProperties = new Properties();
+
+  private KinesisIngressBuilder(IngressIdentifier<T> id) {
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * @param id A unique ingress identifier.
+   * @param <T> The type consumed from Kinesis.
+   * @return A new {@link KinesisIngressBuilder}.
+   */
+  public static <T> KinesisIngressBuilder<T> forIdentifier(IngressIdentifier<T> id) {
+    return new KinesisIngressBuilder<>(id);
+  }
+
+  /** @param stream The name of a stream that should be consumed. */
+  public KinesisIngressBuilder<T> withStream(String stream) {
+    this.streams.add(stream);
+    return this;
+  }
+
+  /** @param streams A list of streams that should be consumed. */
+  public KinesisIngressBuilder<T> withStreams(List<String> streams) {
+    this.streams.addAll(streams);
+    return this;
+  }
+
+  /**
+   * @param deserializerClass The deserializer used to convert between Kinesis's byte messages and
+   *     Java objects.
+   */
+  public KinesisIngressBuilder<T> withDeserializer(
+      Class<? extends KinesisIngressDeserializer<T>> deserializerClass) {
+    this.deserializerClass = Objects.requireNonNull(deserializerClass);
+    return this;
+  }
+
+  /**
+   * Configures the position that the ingress should start consuming from. By default, the startup
+   * position is {@link KinesisIngressStartupPosition#fromLatest()}.
+   *
+   * <p>Note that this configuration only affects the position when starting the application from a
+   * fresh start. When restoring the application from a savepoint, the ingress will always start
+   * consuming from the position persisted in the savepoint.
+   *
+   * @param startupPosition the position that the Kafka ingress should start consuming from.
+   * @see KinesisIngressStartupPosition
+   */
+  public KinesisIngressBuilder<T> withStartupPosition(
+      KinesisIngressStartupPosition startupPosition) {
+    this.startupPosition = Objects.requireNonNull(startupPosition);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsRegion The AWS region to connect to.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+   *     Determine the AWS Region from the Environment</a>.
+   * @see AwsRegion
+   */
+  public KinesisIngressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+   * default provider chain is consulted.
+   *
+   * @param regionName The unique id of the AWS region to connect to.
+   */
+  public KinesisIngressBuilder<T> withAwsRegion(String regionName) {
+    this.awsRegion = AwsRegion.ofId(regionName);
+    return this;
+  }
+
+  /**
+   * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsCredentials The AWS credentials to use.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+   *     the Default Credential Provider Chain</a>.
+   * @see AwsCredentials
+   */
+  public KinesisIngressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    return this;
+  }
+
+  /**
+   * Sets a AWS client configuration to be used by the ingress.
+   *
+   * <p>Supported values are properties of AWS's <a
+   * href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   * For example, to set a value for {@code SOCKET_TIMEOUT}, the property key would be {@code
+   * SocketTimeout}.
+   *
+   * @param key the property to set.
+   * @param value the value for the property.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   */
+  public KinesisIngressBuilder<T> withClientConfigurationProperty(String key, String value) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    this.clientConfigurationProperties.setProperty(key, value);
+    return this;
+  }
+
+  /** @return A new {@link KinesisIngressSpec}. */
+  public KinesisIngressSpec<T> build() {
+    return new KinesisIngressSpec<>(
+        id,
+        streams,
+        deserializerClass,
+        startupPosition,
+        awsRegion,
+        awsCredentials,
+        clientConfigurationProperties);
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
new file mode 100644
index 0000000..226d0fb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.io.Serializable;
+
+/**
+ * Describes how to deserialize {@link IngressRecord}s consumed from AWS Kinesis into data types
+ * that are processed by the system.
+ *
+ * @param <T> The type created by the ingress deserializer.
+ */
+public interface KinesisIngressDeserializer<T> extends Serializable {
+
+  /**
+   * Deserialize an input value from a {@link IngressRecord} consumed from AWS Kinesis.
+   *
+   * @param ingressRecord the {@link IngressRecord} consumed from AWS Kinesis.
+   * @return the deserialized data object.
+   */
+  T deserialize(IngressRecord ingressRecord);
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
new file mode 100644
index 0000000..9bc4a9f
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisIngressSpec<T> implements IngressSpec<T> {
+  private final IngressIdentifier<T> ingressIdentifier;
+  private final List<String> streams;
+  private final Class<? extends KinesisIngressDeserializer<T>> deserializerClass;
+  private final KinesisIngressStartupPosition startupPosition;
+  private final AwsRegion awsRegion;
+  private final AwsCredentials awsCredentials;
+  private final Properties clientConfigurationProperties;
+
+  KinesisIngressSpec(
+      IngressIdentifier<T> ingressIdentifier,
+      List<String> streams,
+      Class<? extends KinesisIngressDeserializer<T>> deserializerClass,
+      KinesisIngressStartupPosition startupPosition,
+      AwsRegion awsRegion,
+      AwsCredentials awsCredentials,
+      Properties clientConfigurationProperties) {
+    this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier");
+    this.deserializerClass = Objects.requireNonNull(deserializerClass, "deserializer class");
+    this.startupPosition = Objects.requireNonNull(startupPosition, "startup position");
+    this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration");
+    this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration");
+    this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+
+    this.streams = Objects.requireNonNull(streams, "AWS Kinesis stream names");
+    if (streams.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Must have at least one stream to consume from specified.");
+    }
+  }
+
+  @Override
+  public IngressIdentifier<T> id() {
+    return ingressIdentifier;
+  }
+
+  @Override
+  public IngressType type() {
+    return KinesisIOTypes.UNIVERSAL_INGRESS_TYPE;
+  }
+
+  public List<String> streams() {
+    return streams;
+  }
+
+  public Class<? extends KinesisIngressDeserializer<T>> deserializerClass() {
+    return deserializerClass;
+  }
+
+  public KinesisIngressStartupPosition startupPosition() {
+    return startupPosition;
+  }
+
+  public AwsRegion awsRegion() {
+    return awsRegion;
+  }
+
+  public AwsCredentials awsCredentials() {
+    return awsCredentials;
+  }
+
+  public Properties clientConfigurationProperties() {
+    return clientConfigurationProperties;
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
new file mode 100644
index 0000000..58aec1e
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressStartupPosition.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.ingress;
+
+import java.time.ZonedDateTime;
+
+/** Position for the ingress to start consuming AWS Kinesis shards. */
+public abstract class KinesisIngressStartupPosition {
+
+  private KinesisIngressStartupPosition() {}
+
+  /** Start consuming from the earliest position possible. */
+  public static KinesisIngressStartupPosition fromEarliest() {
+    return EarliestPosition.INSTANCE;
+  }
+
+  /** Start consuming from the latest position, i.e. head of the stream shards. */
+  public static KinesisIngressStartupPosition fromLatest() {
+    return LatestPosition.INSTANCE;
+  }
+
+  /**
+   * Start consuming from position with ingestion timestamps after or equal to a specified {@link
+   * ZonedDateTime}.
+   */
+  public static KinesisIngressStartupPosition fromDate(ZonedDateTime date) {
+    return new DatePosition(date);
+  }
+
+  /** Checks whether this position is configured using the earliest position. */
+  public final boolean isEarliest() {
+    return getClass() == EarliestPosition.class;
+  }
+
+  /** Checks whether this position is configured using the latest position. */
+  public final boolean isLatest() {
+    return getClass() == LatestPosition.class;
+  }
+
+  /** Checks whether this position is configured using a date. */
+  public final boolean isDate() {
+    return getClass() == DatePosition.class;
+  }
+
+  /** Returns this position as a {@link DatePosition}. */
+  public final DatePosition asDate() {
+    if (!isDate()) {
+      throw new IllegalStateException("This is not a startup position configured using a date.");
+    }
+    return (DatePosition) this;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static final class EarliestPosition extends KinesisIngressStartupPosition {
+    private static final EarliestPosition INSTANCE = new EarliestPosition();
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static final class LatestPosition extends KinesisIngressStartupPosition {
+    private static final LatestPosition INSTANCE = new LatestPosition();
+  }
+
+  public static final class DatePosition extends KinesisIngressStartupPosition {
+
+    private final ZonedDateTime date;
+
+    private DatePosition(ZonedDateTime date) {
+      this.date = date;
+    }
+
+    public ZonedDateTime date() {
+      return date;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof DatePosition)) {
+        return false;
+      }
+
+      DatePosition that = (DatePosition) obj;
+      return that.date.equals(date);
+    }
+
+    @Override
+    public int hashCode() {
+      return date.hashCode();
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
new file mode 100644
index 0000000..bd19b48
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.junit.Test;
+
+public class KinesisIngressBuilderTest {
+
+  private static final IngressIdentifier<String> ID =
+      new IngressIdentifier<>(String.class, "namespace", "name");
+
+  private static final String STREAM_NAME = "test-stream";
+
+  @Test
+  public void exampleUsage() {
+    final KinesisIngressSpec<String> kinesisIngressSpec =
+        KinesisIngressBuilder.forIdentifier(ID)
+            .withDeserializer(TestDeserializer.class)
+            .withStream(STREAM_NAME)
+            .build();
+
+    assertThat(kinesisIngressSpec.id(), is(ID));
+    assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME)));
+    assertTrue(kinesisIngressSpec.awsRegion().isDefault());
+    assertTrue(kinesisIngressSpec.awsCredentials().isDefault());
+    assertEquals(TestDeserializer.class, kinesisIngressSpec.deserializerClass());
+    assertTrue(kinesisIngressSpec.startupPosition().isLatest());
+    assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty());
+  }
+
+  private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String deserialize(IngressRecord ingressRecord) {
+      return null;
+    }
+  }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 69887fc..9bd3076 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -65,6 +65,16 @@ under the License.
         <Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
     </Match>
 
+    <!-- These classes allow setting / getting fields with modifiable types without a defensive copy -->
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+
     <!-- 3rd party -->
     <Match>
         <Class name="~it\.unimi\.dsi\.fastutil\.objects\.ObjectOpenHashMap"/>


[flink-statefun] 03/07: [FLINK-16124] [kinesis] Implement runtime KinesisSourceProvider

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2cd05447de555dc7afcc9a6cff82b2ea41b8967d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:05:06 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime KinesisSourceProvider
---
 statefun-flink/statefun-flink-io-bundle/pom.xml    |   6 +
 .../flink/io/kinesis/AwsAuthConfigProperties.java  |  95 +++++++++++++
 .../KinesisDeserializationSchemaDelegate.java      |  63 +++++++++
 .../flink/io/kinesis/KinesisSourceProvider.java    | 121 +++++++++++++++++
 .../io/kinesis/AwsAuthConfigPropertiesTest.java    | 148 +++++++++++++++++++++
 .../io/kinesis/KinesisSourceProviderTest.java      |  66 +++++++++
 6 files changed, 499 insertions(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/pom.xml b/statefun-flink/statefun-flink-io-bundle/pom.xml
index a62f41d..e9213cb 100644
--- a/statefun-flink/statefun-flink-io-bundle/pom.xml
+++ b/statefun-flink/statefun-flink-io-bundle/pom.xml
@@ -96,6 +96,12 @@ under the License.
             <version>4.12</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
new file mode 100644
index 0000000..e1dfac6
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.util.Locale;
+import java.util.Properties;
+import org.apache.flink.kinesis.shaded.com.amazonaws.regions.DefaultAwsRegionProviderChain;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+final class AwsAuthConfigProperties {
+
+  private AwsAuthConfigProperties() {}
+
+  static Properties forAwsRegion(AwsRegion awsRegion) {
+    final Properties properties = new Properties();
+
+    if (awsRegion.isDefault()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
+    } else if (awsRegion.isId()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
+    } else if (awsRegion.isCustomEndpoint()) {
+      final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
+      properties.setProperty(AWSConfigConstants.AWS_ENDPOINT, customEndpoint.serviceEndpoint());
+      properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
+    } else {
+      throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
+    }
+
+    return properties;
+  }
+
+  static Properties forAwsCredentials(AwsCredentials awsCredentials) {
+    final Properties properties = new Properties();
+
+    if (awsCredentials.isDefault()) {
+      properties.setProperty(
+          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+          AWSConfigConstants.CredentialProvider.AUTO.name());
+    } else if (awsCredentials.isBasic()) {
+      properties.setProperty(
+          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+          AWSConfigConstants.CredentialProvider.BASIC.name());
+
+      final AwsCredentials.BasicAwsCredentials basicCredentials = awsCredentials.asBasic();
+      properties.setProperty(
+          AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+          basicCredentials.accessKeyId());
+      properties.setProperty(
+          AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+          basicCredentials.secretAccessKey());
+    } else if (awsCredentials.isProfile()) {
+      properties.setProperty(
+          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+          AWSConfigConstants.CredentialProvider.PROFILE.name());
+
+      final AwsCredentials.ProfileAwsCredentials profileCredentials = awsCredentials.asProfile();
+      properties.setProperty(
+          AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+          profileCredentials.name());
+      profileCredentials
+          .path()
+          .ifPresent(
+              path ->
+                  properties.setProperty(
+                      AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+                      path));
+    } else {
+      throw new IllegalStateException(
+          "Unrecognized AWS credentials configuration type: " + awsCredentials);
+    }
+
+    return properties;
+  }
+
+  private static String regionFromDefaultProviderChain() {
+    return new DefaultAwsRegionProviderChain().getRegion().toLowerCase(Locale.ENGLISH);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisDeserializationSchemaDelegate.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisDeserializationSchemaDelegate.java
new file mode 100644
index 0000000..bc17516
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisDeserializationSchemaDelegate.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.io.IOException;
+import java.util.Objects;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.statefun.flink.common.UnimplementedTypeInfo;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+final class KinesisDeserializationSchemaDelegate<T> implements KinesisDeserializationSchema<T> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final TypeInformation<T> producedTypeInfo = new UnimplementedTypeInfo<>();
+  private final KinesisIngressDeserializer<T> delegate;
+
+  KinesisDeserializationSchemaDelegate(KinesisIngressDeserializer<T> delegate) {
+    this.delegate = Objects.requireNonNull(delegate);
+  }
+
+  @Override
+  public T deserialize(
+      byte[] recordValue,
+      String partitionKey,
+      String seqNum,
+      long approxArrivalTimestamp,
+      String stream,
+      String shardId)
+      throws IOException {
+    return delegate.deserialize(
+        IngressRecord.newBuilder()
+            .withData(recordValue)
+            .withStream(stream)
+            .withShardId(shardId)
+            .withPartitionKey(partitionKey)
+            .withSequenceNumber(seqNum)
+            .withApproximateArrivalTimestamp(approxArrivalTimestamp)
+            .build());
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return producedTypeInfo;
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
new file mode 100644
index 0000000..dde1e2b
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+final class KinesisSourceProvider implements SourceProvider {
+
+  @Override
+  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+    final KinesisIngressSpec<T> kinesisIngressSpec = asKinesisSpec(spec);
+
+    return new FlinkKinesisConsumer<>(
+        kinesisIngressSpec.streams(),
+        deserializationSchemaFromSpec(kinesisIngressSpec),
+        propertiesFromSpec(kinesisIngressSpec));
+  }
+
+  private static <T> KinesisIngressSpec<T> asKinesisSpec(IngressSpec<T> spec) {
+    if (spec instanceof KinesisIngressSpec) {
+      return (KinesisIngressSpec<T>) spec;
+    }
+    if (spec == null) {
+      throw new NullPointerException("Unable to translate a NULL spec");
+    }
+    throw new IllegalArgumentException(String.format("Wrong type %s", spec.type()));
+  }
+
+  private static <T> KinesisDeserializationSchema<T> deserializationSchemaFromSpec(
+      KinesisIngressSpec<T> spec) {
+    KinesisIngressDeserializer<T> ingressDeserializer =
+        ReflectionUtil.instantiate(spec.deserializerClass());
+    return new KinesisDeserializationSchemaDelegate<>(ingressDeserializer);
+  }
+
+  private static Properties propertiesFromSpec(KinesisIngressSpec<?> spec) {
+    final Properties properties = new Properties();
+
+    properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
+    properties.putAll(AwsAuthConfigProperties.forAwsRegion(spec.awsRegion()));
+    properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+
+    setStartupPositionProperties(properties, spec.startupPosition());
+
+    return properties;
+  }
+
+  private static Properties resolveClientProperties(Properties clientConfigurationProperties) {
+    final Properties resolvedProps = new Properties();
+    for (String property : clientConfigurationProperties.stringPropertyNames()) {
+      resolvedProps.setProperty(
+          asFlinkConsumerClientPropertyKey(property),
+          clientConfigurationProperties.getProperty(property));
+    }
+    return resolvedProps;
+  }
+
+  private static void setStartupPositionProperties(
+      Properties properties, KinesisIngressStartupPosition startupPosition) {
+    if (startupPosition.isEarliest()) {
+      properties.setProperty(
+          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
+          ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
+    } else if (startupPosition.isLatest()) {
+      properties.setProperty(
+          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
+          ConsumerConfigConstants.InitialPosition.LATEST.name());
+    } else if (startupPosition.isDate()) {
+      properties.setProperty(
+          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
+          ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP.name());
+
+      final ZonedDateTime startupDate = startupPosition.asDate().date();
+      final DateTimeFormatter formatter =
+          DateTimeFormatter.ofPattern(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+      properties.setProperty(
+          ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startupDate.format(formatter));
+    } else {
+      throw new IllegalStateException(
+          "Unrecognized ingress startup position type: " + startupPosition);
+    }
+  }
+
+  private static String asFlinkConsumerClientPropertyKey(String key) {
+    return AWSUtil.AWS_CLIENT_CONFIG_PREFIX + lowercaseFirstLetter(key);
+  }
+
+  private static String lowercaseFirstLetter(String string) {
+    final char[] chars = string.toCharArray();
+    chars[0] = Character.toLowerCase(chars[0]);
+    return new String(chars);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
new file mode 100644
index 0000000..1d50656
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+
+import java.io.Closeable;
+import java.util.Properties;
+import org.apache.flink.kinesis.shaded.com.amazonaws.SDKGlobalConfiguration;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.junit.Test;
+
+public class AwsAuthConfigPropertiesTest {
+
+  @Test
+  public void awsDefaultRegionProperties() {
+    // TODO Flink doesn't support auto region detection from the AWS provider chain,
+    // TODO so we always have to have the region settings available in the client side
+    // TODO this should no longer be a restriction once we fix this in the Flink connector side
+    try (final ScopedSystemProperty awsRegionSystemProps =
+        new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
+      final Properties properties =
+          AwsAuthConfigProperties.forAwsRegion(AwsRegion.fromDefaultProviderChain());
+
+      assertThat(properties.entrySet(), hasSize(1));
+      assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
+    }
+  }
+
+  @Test
+  public void awsSpecificRegionProperties() {
+    final Properties properties = AwsAuthConfigProperties.forAwsRegion(AwsRegion.ofId("us-east-2"));
+
+    assertThat(properties.entrySet(), hasSize(1));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
+  }
+
+  @Test
+  public void awsCustomEndpointRegionProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegion(
+            AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
+
+    assertThat(properties.entrySet(), hasSize(2));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_ENDPOINT, "https://foo.bar:6666"));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-1"));
+  }
+
+  @Test
+  public void awsDefaultCredentialsProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsCredentials(AwsCredentials.fromDefaultProviderChain());
+
+    assertThat(properties.entrySet(), hasSize(1));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+            AWSConfigConstants.CredentialProvider.AUTO.name()));
+  }
+
+  @Test
+  public void awsBasicCredentialsProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsCredentials(
+            AwsCredentials.basic("fake-access-key-id", "fake-secret-access-key"));
+
+    assertThat(properties.entrySet(), hasSize(3));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+            AWSConfigConstants.CredentialProvider.BASIC.name()));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+            "fake-access-key-id"));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+            "fake-secret-access-key"));
+  }
+
+  @Test
+  public void awsProfileCredentialsProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsCredentials(
+            AwsCredentials.profile("fake-profile", "/fake/profile/path"));
+
+    assertThat(properties.entrySet(), hasSize(3));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
+            AWSConfigConstants.CredentialProvider.PROFILE.name()));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+            "fake-profile"));
+    assertThat(
+        properties,
+        hasEntry(
+            AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
+            "/fake/profile/path"));
+  }
+
+  private static class ScopedSystemProperty implements Closeable {
+
+    private final String key;
+    private final String previousValue;
+
+    private ScopedSystemProperty(String key, String value) {
+      this.key = key;
+      this.previousValue = System.setProperty(key, value);
+    }
+
+    @Override
+    public void close() {
+      if (previousValue != null) {
+        System.setProperty(key, previousValue);
+      } else {
+        System.clearProperty(key);
+      }
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProviderTest.java
new file mode 100644
index 0000000..bb8cf36
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProviderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
+import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.junit.Test;
+
+public class KinesisSourceProviderTest {
+
+  private static final IngressIdentifier<String> ID =
+      new IngressIdentifier<>(String.class, "namespace", "name");
+
+  private static final String STREAM_NAME = "test-stream";
+
+  @Test
+  public void exampleUsage() {
+    final KinesisIngressSpec<String> kinesisIngressSpec =
+        KinesisIngressBuilder.forIdentifier(ID)
+            .withAwsRegion("us-west-1")
+            .withAwsCredentials(AwsCredentials.basic("access-key-id", "secret-access-key"))
+            .withDeserializer(TestDeserializer.class)
+            .withStream(STREAM_NAME)
+            .build();
+
+    final KinesisSourceProvider provider = new KinesisSourceProvider();
+    final SourceFunction<String> source = provider.forSpec(kinesisIngressSpec);
+
+    assertThat(source, instanceOf(FlinkKinesisConsumer.class));
+  }
+
+  private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String deserialize(IngressRecord ingressRecord) {
+      return null;
+    }
+  }
+}


[flink-statefun] 04/07: [FLINK-16124] [kinesis] Bind KinesisIngressProvider to Java Kinesis Ingress Type

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 059a9e87a56d8bfe4108ab9c0b1237a0f3506b38
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 00:05:31 2020 +0800

    [FLINK-16124] [kinesis] Bind KinesisIngressProvider to Java Kinesis Ingress Type
---
 .../flink/io/kinesis/KinesisFlinkIOModule.java     | 32 ++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
new file mode 100644
index 0000000..cca3c55
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+
+@AutoService(FlinkIoModule.class)
+public final class KinesisFlinkIOModule implements FlinkIoModule {
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    binder.bindSourceProvider(KinesisIOTypes.UNIVERSAL_INGRESS_TYPE, new KinesisSourceProvider());
+  }
+}


[flink-statefun] 06/07: [FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2668eb12464f60b734057baa08ade318585a1982
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:48 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider
---
 .../flink/io/kinesis/AwsAuthConfigProperties.java  |  28 +++++-
 .../CachingPartitionerSerializerDelegate.java      |  85 +++++++++++++++++
 .../flink/io/kinesis/KinesisSinkProvider.java      |  73 +++++++++++++++
 .../flink/io/kinesis/KinesisSourceProvider.java    |   2 +-
 .../io/kinesis/AwsAuthConfigPropertiesTest.java    |  49 ++++++++--
 .../CachingPartitionerSerializerDelegateTest.java  | 104 +++++++++++++++++++++
 .../flink/io/kinesis/KinesisSinkProviderTest.java  |  62 ++++++++++++
 7 files changed, 395 insertions(+), 8 deletions(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
index e1dfac6..ee7dad2 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.io.kinesis;
 
+import java.net.URI;
 import java.util.Locale;
 import java.util.Properties;
 import org.apache.flink.kinesis.shaded.com.amazonaws.regions.DefaultAwsRegionProviderChain;
@@ -28,7 +29,7 @@ final class AwsAuthConfigProperties {
 
   private AwsAuthConfigProperties() {}
 
-  static Properties forAwsRegion(AwsRegion awsRegion) {
+  static Properties forAwsRegionConsumerProps(AwsRegion awsRegion) {
     final Properties properties = new Properties();
 
     if (awsRegion.isDefault()) {
@@ -46,6 +47,31 @@ final class AwsAuthConfigProperties {
     return properties;
   }
 
+  static Properties forAwsRegionProducerProps(AwsRegion awsRegion) {
+    final Properties properties = new Properties();
+
+    if (awsRegion.isDefault()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
+    } else if (awsRegion.isId()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
+    } else if (awsRegion.isCustomEndpoint()) {
+      final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
+
+      final URI uri = URI.create(customEndpoint.serviceEndpoint());
+      properties.setProperty("KinesisEndpoint", uri.getHost());
+      properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
+
+      int port = uri.getPort();
+      if (port != -1) {
+        properties.setProperty("KinesisPort", String.valueOf(port));
+      }
+    } else {
+      throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
+    }
+
+    return properties;
+  }
+
   static Properties forAwsCredentials(AwsCredentials awsCredentials) {
     final Properties properties = new Properties();
 
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
new file mode 100644
index 0000000..757a73f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+/**
+ * An implementation of a {@link KinesisPartitioner} and {@link KinesisSerializationSchema}, that
+ * delegates partitioning and serialization to a wrapped {@link KinesisEgressSerializer}, while also
+ * caching already processed element objects to avoid duplicate serialization.
+ *
+ * <p>To avoid duplicate serialization, a shared instance of this is used as both the partitioner
+ * and the serialization schema within a single subtask of a {@link FlinkKinesisProducer}.
+ *
+ * <p>Note that this class is not thread-safe, and should not be accessed concurrently.
+ *
+ * @param <T>
+ */
+@NotThreadSafe
+final class CachingPartitionerSerializerDelegate<T> extends KinesisPartitioner<T>
+    implements KinesisSerializationSchema<T> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KinesisEgressSerializer<T> delegate;
+
+  private transient T lastProcessedElement;
+  private transient EgressRecord lastSerializedRecord;
+
+  CachingPartitionerSerializerDelegate(KinesisEgressSerializer<T> delegate) {
+    this.delegate = Objects.requireNonNull(delegate);
+  }
+
+  @Override
+  public ByteBuffer serialize(T element) {
+    return ByteBuffer.wrap(getLastOrCreateNewSerializedRecord(element).getData());
+  }
+
+  @Override
+  public String getTargetStream(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getStream();
+  }
+
+  @Override
+  public String getPartitionId(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getPartitionKey();
+  }
+
+  @Override
+  public String getExplicitHashKey(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getExplicitHashKey();
+  }
+
+  private EgressRecord getLastOrCreateNewSerializedRecord(T element) {
+    if (element == lastProcessedElement) {
+      return lastSerializedRecord;
+    }
+    lastProcessedElement = element;
+    lastSerializedRecord = delegate.serialize(element);
+    return lastSerializedRecord;
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
new file mode 100644
index 0000000..c1156b6
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.util.Properties;
+import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+final class KinesisSinkProvider implements SinkProvider {
+
+  @Override
+  public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+    final KinesisEgressSpec<T> kinesisEgressSpec = asKinesisSpec(spec);
+
+    final CachingPartitionerSerializerDelegate<T> partitionerSerializerDelegate =
+        new CachingPartitionerSerializerDelegate<>(serializerInstanceFromSpec(kinesisEgressSpec));
+
+    final FlinkKinesisProducer<T> kinesisProducer =
+        new FlinkKinesisProducer<>(
+            partitionerSerializerDelegate, propertiesFromSpec(kinesisEgressSpec));
+    kinesisProducer.setCustomPartitioner(partitionerSerializerDelegate);
+    kinesisProducer.setQueueLimit(kinesisEgressSpec.maxOutstandingRecords());
+    // set fail on error, for at-least-once delivery semantics to Kinesis
+    kinesisProducer.setFailOnError(true);
+
+    return kinesisProducer;
+  }
+
+  private static Properties propertiesFromSpec(KinesisEgressSpec<?> spec) {
+    final Properties properties = new Properties();
+
+    properties.putAll(spec.clientConfigurationProperties());
+    properties.putAll(AwsAuthConfigProperties.forAwsRegionProducerProps(spec.awsRegion()));
+    properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+
+    return properties;
+  }
+
+  private static <T> KinesisEgressSpec<T> asKinesisSpec(EgressSpec<T> spec) {
+    if (spec instanceof KinesisEgressSpec) {
+      return (KinesisEgressSpec<T>) spec;
+    }
+    if (spec == null) {
+      throw new NullPointerException("Unable to translate a NULL spec");
+    }
+    throw new IllegalArgumentException(String.format("Wrong type %s", spec.type()));
+  }
+
+  private static <T> KinesisEgressSerializer<T> serializerInstanceFromSpec(
+      KinesisEgressSpec<T> spec) {
+    return ReflectionUtil.instantiate(spec.serializerClass());
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index dde1e2b..92665eb 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -65,7 +65,7 @@ final class KinesisSourceProvider implements SourceProvider {
     final Properties properties = new Properties();
 
     properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
-    properties.putAll(AwsAuthConfigProperties.forAwsRegion(spec.awsRegion()));
+    properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(spec.awsRegion()));
     properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
 
     setStartupPositionProperties(properties, spec.startupPosition());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
index 1d50656..ccf9875 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
@@ -32,14 +32,14 @@ import org.junit.Test;
 public class AwsAuthConfigPropertiesTest {
 
   @Test
-  public void awsDefaultRegionProperties() {
+  public void awsDefaultRegionConsumerProperties() {
     // TODO Flink doesn't support auto region detection from the AWS provider chain,
     // TODO so we always have to have the region settings available in the client side
     // TODO this should no longer be a restriction once we fix this in the Flink connector side
     try (final ScopedSystemProperty awsRegionSystemProps =
         new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
       final Properties properties =
-          AwsAuthConfigProperties.forAwsRegion(AwsRegion.fromDefaultProviderChain());
+          AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.fromDefaultProviderChain());
 
       assertThat(properties.entrySet(), hasSize(1));
       assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
@@ -47,17 +47,18 @@ public class AwsAuthConfigPropertiesTest {
   }
 
   @Test
-  public void awsSpecificRegionProperties() {
-    final Properties properties = AwsAuthConfigProperties.forAwsRegion(AwsRegion.ofId("us-east-2"));
+  public void awsSpecificRegionConsumerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.ofId("us-east-2"));
 
     assertThat(properties.entrySet(), hasSize(1));
     assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
   }
 
   @Test
-  public void awsCustomEndpointRegionProperties() {
+  public void awsCustomEndpointRegionConsumerProperties() {
     final Properties properties =
-        AwsAuthConfigProperties.forAwsRegion(
+        AwsAuthConfigProperties.forAwsRegionConsumerProps(
             AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
 
     assertThat(properties.entrySet(), hasSize(2));
@@ -66,6 +67,42 @@ public class AwsAuthConfigPropertiesTest {
   }
 
   @Test
+  public void awsDefaultRegionProducerProperties() {
+    // TODO Flink doesn't support auto region detection from the AWS provider chain,
+    // TODO so we always have to have the region settings available in the client side
+    // TODO this should no longer be a restriction once we fix this in the Flink connector side
+    try (final ScopedSystemProperty awsRegionSystemProps =
+        new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
+      final Properties properties =
+          AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.fromDefaultProviderChain());
+
+      assertThat(properties.entrySet(), hasSize(1));
+      assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
+    }
+  }
+
+  @Test
+  public void awsSpecificRegionProducerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.ofId("us-east-2"));
+
+    assertThat(properties.entrySet(), hasSize(1));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
+  }
+
+  @Test
+  public void awsCustomEndpointRegionProducerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionProducerProps(
+            AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
+
+    assertThat(properties.entrySet(), hasSize(3));
+    assertThat(properties, hasEntry("KinesisEndpoint", "foo.bar"));
+    assertThat(properties, hasEntry("KinesisPort", "6666"));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-1"));
+  }
+
+  @Test
   public void awsDefaultCredentialsProperties() {
     final Properties properties =
         AwsAuthConfigProperties.forAwsCredentials(AwsCredentials.fromDefaultProviderChain());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
new file mode 100644
index 0000000..a9fd97a
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.junit.Test;
+
+public class CachingPartitionerSerializerDelegateTest {
+
+  private static final String TEST_INPUT = "input";
+  private static final String TEST_STREAM = "stream";
+  private static final String TEST_PARTITION_KEY = "partition-key";
+  private static final String TEST_EXPLICIT_HASH_KEY = "explicit-hash-key";
+
+  @Test
+  public void noDuplicateSerialization() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    cachingDelegate.serialize(TEST_INPUT);
+
+    // these throw if the wrapped serializer is used multiple times
+    cachingDelegate.getTargetStream(TEST_INPUT);
+    cachingDelegate.getPartitionId(TEST_INPUT);
+    cachingDelegate.getExplicitHashKey(TEST_INPUT);
+  }
+
+  @Test
+  public void serialize() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(
+        cachingDelegate.serialize(TEST_INPUT),
+        is(ByteBuffer.wrap(TEST_INPUT.getBytes(StandardCharsets.UTF_8))));
+  }
+
+  @Test
+  public void targetStream() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getTargetStream(TEST_INPUT), is(TEST_STREAM));
+  }
+
+  @Test
+  public void partitionId() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getPartitionId(TEST_INPUT), is(TEST_PARTITION_KEY));
+  }
+
+  @Test
+  public void explicitHashKey() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getExplicitHashKey(TEST_INPUT), is(TEST_EXPLICIT_HASH_KEY));
+  }
+
+  private static class DuplicateSerializationDetectingSerializer
+      implements KinesisEgressSerializer<String> {
+
+    private boolean isInvoked;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      if (isInvoked) {
+        fail("Duplicate serialization detected.");
+      }
+      isInvoked = true;
+      return EgressRecord.newBuilder()
+          .withData(value.getBytes(StandardCharsets.UTF_8))
+          .withStream(TEST_STREAM)
+          .withPartitionKey(TEST_PARTITION_KEY)
+          .withExplicitHashKey(TEST_EXPLICIT_HASH_KEY)
+          .build();
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
new file mode 100644
index 0000000..cb0fe7f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.junit.Test;
+
+public class KinesisSinkProviderTest {
+
+  private static final EgressIdentifier<String> ID =
+      new EgressIdentifier<>("namespace", "name", String.class);
+
+  @Test
+  public void exampleUsage() {
+    final KinesisEgressSpec<String> kinesisEgressSpec =
+        KinesisEgressBuilder.forIdentifier(ID)
+            .withAwsRegion("us-west-1")
+            .withAwsCredentials(AwsCredentials.basic("access-key-id", "secret-access-key"))
+            .withSerializer(TestSerializer.class)
+            .build();
+
+    final KinesisSinkProvider provider = new KinesisSinkProvider();
+    final SinkFunction<String> sink = provider.forSpec(kinesisEgressSpec);
+
+    assertThat(sink, instanceOf(FlinkKinesisProducer.class));
+  }
+
+  private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      return null;
+    }
+  }
+}


[flink-statefun] 07/07: [FLINK-16124] [kinesis] Bind KinesisSinkProvider to Java Kinesis Egress Type

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4ac1dba269523c2ca41e955423a66c04e36eb6a7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:25:28 2020 +0800

    [FLINK-16124] [kinesis] Bind KinesisSinkProvider to Java Kinesis Egress Type
    
    This closes #61.
---
 .../org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
index cca3c55..c3cbd0a 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java
@@ -28,5 +28,6 @@ public final class KinesisFlinkIOModule implements FlinkIoModule {
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
     binder.bindSourceProvider(KinesisIOTypes.UNIVERSAL_INGRESS_TYPE, new KinesisSourceProvider());
+    binder.bindSinkProvider(KinesisIOTypes.UNIVERSAL_EGRESS_TYPE, new KinesisSinkProvider());
   }
 }