You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:13 UTC

[flink-statefun] 06/07: [FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2668eb12464f60b734057baa08ade318585a1982
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:48 2020 +0800

    [FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider
---
 .../flink/io/kinesis/AwsAuthConfigProperties.java  |  28 +++++-
 .../CachingPartitionerSerializerDelegate.java      |  85 +++++++++++++++++
 .../flink/io/kinesis/KinesisSinkProvider.java      |  73 +++++++++++++++
 .../flink/io/kinesis/KinesisSourceProvider.java    |   2 +-
 .../io/kinesis/AwsAuthConfigPropertiesTest.java    |  49 ++++++++--
 .../CachingPartitionerSerializerDelegateTest.java  | 104 +++++++++++++++++++++
 .../flink/io/kinesis/KinesisSinkProviderTest.java  |  62 ++++++++++++
 7 files changed, 395 insertions(+), 8 deletions(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
index e1dfac6..ee7dad2 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.io.kinesis;
 
+import java.net.URI;
 import java.util.Locale;
 import java.util.Properties;
 import org.apache.flink.kinesis.shaded.com.amazonaws.regions.DefaultAwsRegionProviderChain;
@@ -28,7 +29,7 @@ final class AwsAuthConfigProperties {
 
   private AwsAuthConfigProperties() {}
 
-  static Properties forAwsRegion(AwsRegion awsRegion) {
+  static Properties forAwsRegionConsumerProps(AwsRegion awsRegion) {
     final Properties properties = new Properties();
 
     if (awsRegion.isDefault()) {
@@ -46,6 +47,31 @@ final class AwsAuthConfigProperties {
     return properties;
   }
 
+  static Properties forAwsRegionProducerProps(AwsRegion awsRegion) {
+    final Properties properties = new Properties();
+
+    if (awsRegion.isDefault()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
+    } else if (awsRegion.isId()) {
+      properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
+    } else if (awsRegion.isCustomEndpoint()) {
+      final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
+
+      final URI uri = URI.create(customEndpoint.serviceEndpoint());
+      properties.setProperty("KinesisEndpoint", uri.getHost());
+      properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
+
+      int port = uri.getPort();
+      if (port != -1) {
+        properties.setProperty("KinesisPort", String.valueOf(port));
+      }
+    } else {
+      throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
+    }
+
+    return properties;
+  }
+
   static Properties forAwsCredentials(AwsCredentials awsCredentials) {
     final Properties properties = new Properties();
 
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
new file mode 100644
index 0000000..757a73f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+/**
+ * An implementation of a {@link KinesisPartitioner} and {@link KinesisSerializationSchema}, that
+ * delegates partitioning and serialization to a wrapped {@link KinesisEgressSerializer}, while also
+ * caching already processed element objects to avoid duplicate serialization.
+ *
+ * <p>To avoid duplicate serialization, a shared instance of this is used as both the partitioner
+ * and the serialization schema within a single subtask of a {@link FlinkKinesisProducer}.
+ *
+ * <p>Note that this class is not thread-safe, and should not be accessed concurrently.
+ *
+ * @param <T>
+ */
+@NotThreadSafe
+final class CachingPartitionerSerializerDelegate<T> extends KinesisPartitioner<T>
+    implements KinesisSerializationSchema<T> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KinesisEgressSerializer<T> delegate;
+
+  private transient T lastProcessedElement;
+  private transient EgressRecord lastSerializedRecord;
+
+  CachingPartitionerSerializerDelegate(KinesisEgressSerializer<T> delegate) {
+    this.delegate = Objects.requireNonNull(delegate);
+  }
+
+  @Override
+  public ByteBuffer serialize(T element) {
+    return ByteBuffer.wrap(getLastOrCreateNewSerializedRecord(element).getData());
+  }
+
+  @Override
+  public String getTargetStream(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getStream();
+  }
+
+  @Override
+  public String getPartitionId(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getPartitionKey();
+  }
+
+  @Override
+  public String getExplicitHashKey(T element) {
+    return getLastOrCreateNewSerializedRecord(element).getExplicitHashKey();
+  }
+
+  private EgressRecord getLastOrCreateNewSerializedRecord(T element) {
+    if (element == lastProcessedElement) {
+      return lastSerializedRecord;
+    }
+    lastProcessedElement = element;
+    lastSerializedRecord = delegate.serialize(element);
+    return lastSerializedRecord;
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
new file mode 100644
index 0000000..c1156b6
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.util.Properties;
+import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+final class KinesisSinkProvider implements SinkProvider {
+
+  @Override
+  public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+    final KinesisEgressSpec<T> kinesisEgressSpec = asKinesisSpec(spec);
+
+    final CachingPartitionerSerializerDelegate<T> partitionerSerializerDelegate =
+        new CachingPartitionerSerializerDelegate<>(serializerInstanceFromSpec(kinesisEgressSpec));
+
+    final FlinkKinesisProducer<T> kinesisProducer =
+        new FlinkKinesisProducer<>(
+            partitionerSerializerDelegate, propertiesFromSpec(kinesisEgressSpec));
+    kinesisProducer.setCustomPartitioner(partitionerSerializerDelegate);
+    kinesisProducer.setQueueLimit(kinesisEgressSpec.maxOutstandingRecords());
+    // set fail on error, for at-least-once delivery semantics to Kinesis
+    kinesisProducer.setFailOnError(true);
+
+    return kinesisProducer;
+  }
+
+  private static Properties propertiesFromSpec(KinesisEgressSpec<?> spec) {
+    final Properties properties = new Properties();
+
+    properties.putAll(spec.clientConfigurationProperties());
+    properties.putAll(AwsAuthConfigProperties.forAwsRegionProducerProps(spec.awsRegion()));
+    properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+
+    return properties;
+  }
+
+  private static <T> KinesisEgressSpec<T> asKinesisSpec(EgressSpec<T> spec) {
+    if (spec instanceof KinesisEgressSpec) {
+      return (KinesisEgressSpec<T>) spec;
+    }
+    if (spec == null) {
+      throw new NullPointerException("Unable to translate a NULL spec");
+    }
+    throw new IllegalArgumentException(String.format("Wrong type %s", spec.type()));
+  }
+
+  private static <T> KinesisEgressSerializer<T> serializerInstanceFromSpec(
+      KinesisEgressSpec<T> spec) {
+    return ReflectionUtil.instantiate(spec.serializerClass());
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index dde1e2b..92665eb 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -65,7 +65,7 @@ final class KinesisSourceProvider implements SourceProvider {
     final Properties properties = new Properties();
 
     properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
-    properties.putAll(AwsAuthConfigProperties.forAwsRegion(spec.awsRegion()));
+    properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(spec.awsRegion()));
     properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
 
     setStartupPositionProperties(properties, spec.startupPosition());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
index 1d50656..ccf9875 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
@@ -32,14 +32,14 @@ import org.junit.Test;
 public class AwsAuthConfigPropertiesTest {
 
   @Test
-  public void awsDefaultRegionProperties() {
+  public void awsDefaultRegionConsumerProperties() {
     // TODO Flink doesn't support auto region detection from the AWS provider chain,
     // TODO so we always have to have the region settings available in the client side
     // TODO this should no longer be a restriction once we fix this in the Flink connector side
     try (final ScopedSystemProperty awsRegionSystemProps =
         new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
       final Properties properties =
-          AwsAuthConfigProperties.forAwsRegion(AwsRegion.fromDefaultProviderChain());
+          AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.fromDefaultProviderChain());
 
       assertThat(properties.entrySet(), hasSize(1));
       assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
@@ -47,17 +47,18 @@ public class AwsAuthConfigPropertiesTest {
   }
 
   @Test
-  public void awsSpecificRegionProperties() {
-    final Properties properties = AwsAuthConfigProperties.forAwsRegion(AwsRegion.ofId("us-east-2"));
+  public void awsSpecificRegionConsumerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.ofId("us-east-2"));
 
     assertThat(properties.entrySet(), hasSize(1));
     assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
   }
 
   @Test
-  public void awsCustomEndpointRegionProperties() {
+  public void awsCustomEndpointRegionConsumerProperties() {
     final Properties properties =
-        AwsAuthConfigProperties.forAwsRegion(
+        AwsAuthConfigProperties.forAwsRegionConsumerProps(
             AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
 
     assertThat(properties.entrySet(), hasSize(2));
@@ -66,6 +67,42 @@ public class AwsAuthConfigPropertiesTest {
   }
 
   @Test
+  public void awsDefaultRegionProducerProperties() {
+    // TODO Flink doesn't support auto region detection from the AWS provider chain,
+    // TODO so we always have to have the region settings available in the client side
+    // TODO this should no longer be a restriction once we fix this in the Flink connector side
+    try (final ScopedSystemProperty awsRegionSystemProps =
+        new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
+      final Properties properties =
+          AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.fromDefaultProviderChain());
+
+      assertThat(properties.entrySet(), hasSize(1));
+      assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
+    }
+  }
+
+  @Test
+  public void awsSpecificRegionProducerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.ofId("us-east-2"));
+
+    assertThat(properties.entrySet(), hasSize(1));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
+  }
+
+  @Test
+  public void awsCustomEndpointRegionProducerProperties() {
+    final Properties properties =
+        AwsAuthConfigProperties.forAwsRegionProducerProps(
+            AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
+
+    assertThat(properties.entrySet(), hasSize(3));
+    assertThat(properties, hasEntry("KinesisEndpoint", "foo.bar"));
+    assertThat(properties, hasEntry("KinesisPort", "6666"));
+    assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-1"));
+  }
+
+  @Test
   public void awsDefaultCredentialsProperties() {
     final Properties properties =
         AwsAuthConfigProperties.forAwsCredentials(AwsCredentials.fromDefaultProviderChain());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
new file mode 100644
index 0000000..a9fd97a
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.junit.Test;
+
+public class CachingPartitionerSerializerDelegateTest {
+
+  private static final String TEST_INPUT = "input";
+  private static final String TEST_STREAM = "stream";
+  private static final String TEST_PARTITION_KEY = "partition-key";
+  private static final String TEST_EXPLICIT_HASH_KEY = "explicit-hash-key";
+
+  @Test
+  public void noDuplicateSerialization() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    cachingDelegate.serialize(TEST_INPUT);
+
+    // these throw if the wrapped serializer is used multiple times
+    cachingDelegate.getTargetStream(TEST_INPUT);
+    cachingDelegate.getPartitionId(TEST_INPUT);
+    cachingDelegate.getExplicitHashKey(TEST_INPUT);
+  }
+
+  @Test
+  public void serialize() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(
+        cachingDelegate.serialize(TEST_INPUT),
+        is(ByteBuffer.wrap(TEST_INPUT.getBytes(StandardCharsets.UTF_8))));
+  }
+
+  @Test
+  public void targetStream() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getTargetStream(TEST_INPUT), is(TEST_STREAM));
+  }
+
+  @Test
+  public void partitionId() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getPartitionId(TEST_INPUT), is(TEST_PARTITION_KEY));
+  }
+
+  @Test
+  public void explicitHashKey() {
+    final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+        new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+    assertThat(cachingDelegate.getExplicitHashKey(TEST_INPUT), is(TEST_EXPLICIT_HASH_KEY));
+  }
+
+  private static class DuplicateSerializationDetectingSerializer
+      implements KinesisEgressSerializer<String> {
+
+    private boolean isInvoked;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      if (isInvoked) {
+        fail("Duplicate serialization detected.");
+      }
+      isInvoked = true;
+      return EgressRecord.newBuilder()
+          .withData(value.getBytes(StandardCharsets.UTF_8))
+          .withStream(TEST_STREAM)
+          .withPartitionKey(TEST_PARTITION_KEY)
+          .withExplicitHashKey(TEST_EXPLICIT_HASH_KEY)
+          .build();
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
new file mode 100644
index 0000000..cb0fe7f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.junit.Test;
+
+public class KinesisSinkProviderTest {
+
+  private static final EgressIdentifier<String> ID =
+      new EgressIdentifier<>("namespace", "name", String.class);
+
+  @Test
+  public void exampleUsage() {
+    final KinesisEgressSpec<String> kinesisEgressSpec =
+        KinesisEgressBuilder.forIdentifier(ID)
+            .withAwsRegion("us-west-1")
+            .withAwsCredentials(AwsCredentials.basic("access-key-id", "secret-access-key"))
+            .withSerializer(TestSerializer.class)
+            .build();
+
+    final KinesisSinkProvider provider = new KinesisSinkProvider();
+    final SinkFunction<String> sink = provider.forSpec(kinesisEgressSpec);
+
+    assertThat(sink, instanceOf(FlinkKinesisProducer.class));
+  }
+
+  private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      return null;
+    }
+  }
+}