You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:13 UTC
[flink-statefun] 06/07: [FLINK-16124] [kinesis] Implement runtime
KinesisSinkProvider
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 2668eb12464f60b734057baa08ade318585a1982
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:48 2020 +0800
[FLINK-16124] [kinesis] Implement runtime KinesisSinkProvider
---
.../flink/io/kinesis/AwsAuthConfigProperties.java | 28 +++++-
.../CachingPartitionerSerializerDelegate.java | 85 +++++++++++++++++
.../flink/io/kinesis/KinesisSinkProvider.java | 73 +++++++++++++++
.../flink/io/kinesis/KinesisSourceProvider.java | 2 +-
.../io/kinesis/AwsAuthConfigPropertiesTest.java | 49 ++++++++--
.../CachingPartitionerSerializerDelegateTest.java | 104 +++++++++++++++++++++
.../flink/io/kinesis/KinesisSinkProviderTest.java | 62 ++++++++++++
7 files changed, 395 insertions(+), 8 deletions(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
index e1dfac6..ee7dad2 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigProperties.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.statefun.flink.io.kinesis;
+import java.net.URI;
import java.util.Locale;
import java.util.Properties;
import org.apache.flink.kinesis.shaded.com.amazonaws.regions.DefaultAwsRegionProviderChain;
@@ -28,7 +29,7 @@ final class AwsAuthConfigProperties {
private AwsAuthConfigProperties() {}
- static Properties forAwsRegion(AwsRegion awsRegion) {
+ static Properties forAwsRegionConsumerProps(AwsRegion awsRegion) {
final Properties properties = new Properties();
if (awsRegion.isDefault()) {
@@ -46,6 +47,31 @@ final class AwsAuthConfigProperties {
return properties;
}
+ static Properties forAwsRegionProducerProps(AwsRegion awsRegion) {
+ final Properties properties = new Properties();
+
+ if (awsRegion.isDefault()) {
+ properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
+ } else if (awsRegion.isId()) {
+ properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
+ } else if (awsRegion.isCustomEndpoint()) {
+ final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
+
+ final URI uri = URI.create(customEndpoint.serviceEndpoint());
+ properties.setProperty("KinesisEndpoint", uri.getHost());
+ properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
+
+ int port = uri.getPort();
+ if (port != -1) {
+ properties.setProperty("KinesisPort", String.valueOf(port));
+ }
+ } else {
+ throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
+ }
+
+ return properties;
+ }
+
static Properties forAwsCredentials(AwsCredentials awsCredentials) {
final Properties properties = new Properties();
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
new file mode 100644
index 0000000..757a73f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegate.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+/**
+ * An implementation of a {@link KinesisPartitioner} and {@link KinesisSerializationSchema}, that
+ * delegates partitioning and serialization to a wrapped {@link KinesisEgressSerializer}, while also
+ * caching already processed element objects to avoid duplicate serialization.
+ *
+ * <p>To avoid duplicate serialization, a shared instance of this is used as both the partitioner
+ * and the serialization schema within a single subtask of a {@link FlinkKinesisProducer}.
+ *
+ * <p>Note that this class is not thread-safe, and should not be accessed concurrently.
+ *
+ * @param <T>
+ */
+@NotThreadSafe
+final class CachingPartitionerSerializerDelegate<T> extends KinesisPartitioner<T>
+ implements KinesisSerializationSchema<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final KinesisEgressSerializer<T> delegate;
+
+ private transient T lastProcessedElement;
+ private transient EgressRecord lastSerializedRecord;
+
+ CachingPartitionerSerializerDelegate(KinesisEgressSerializer<T> delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ @Override
+ public ByteBuffer serialize(T element) {
+ return ByteBuffer.wrap(getLastOrCreateNewSerializedRecord(element).getData());
+ }
+
+ @Override
+ public String getTargetStream(T element) {
+ return getLastOrCreateNewSerializedRecord(element).getStream();
+ }
+
+ @Override
+ public String getPartitionId(T element) {
+ return getLastOrCreateNewSerializedRecord(element).getPartitionKey();
+ }
+
+ @Override
+ public String getExplicitHashKey(T element) {
+ return getLastOrCreateNewSerializedRecord(element).getExplicitHashKey();
+ }
+
+ private EgressRecord getLastOrCreateNewSerializedRecord(T element) {
+ if (element == lastProcessedElement) {
+ return lastSerializedRecord;
+ }
+ lastProcessedElement = element;
+ lastSerializedRecord = delegate.serialize(element);
+ return lastSerializedRecord;
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
new file mode 100644
index 0000000..c1156b6
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import java.util.Properties;
+import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+final class KinesisSinkProvider implements SinkProvider {
+
+ @Override
+ public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+ final KinesisEgressSpec<T> kinesisEgressSpec = asKinesisSpec(spec);
+
+ final CachingPartitionerSerializerDelegate<T> partitionerSerializerDelegate =
+ new CachingPartitionerSerializerDelegate<>(serializerInstanceFromSpec(kinesisEgressSpec));
+
+ final FlinkKinesisProducer<T> kinesisProducer =
+ new FlinkKinesisProducer<>(
+ partitionerSerializerDelegate, propertiesFromSpec(kinesisEgressSpec));
+ kinesisProducer.setCustomPartitioner(partitionerSerializerDelegate);
+ kinesisProducer.setQueueLimit(kinesisEgressSpec.maxOutstandingRecords());
+ // set fail on error, for at-least-once delivery semantics to Kinesis
+ kinesisProducer.setFailOnError(true);
+
+ return kinesisProducer;
+ }
+
+ private static Properties propertiesFromSpec(KinesisEgressSpec<?> spec) {
+ final Properties properties = new Properties();
+
+ properties.putAll(spec.clientConfigurationProperties());
+ properties.putAll(AwsAuthConfigProperties.forAwsRegionProducerProps(spec.awsRegion()));
+ properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+
+ return properties;
+ }
+
+ private static <T> KinesisEgressSpec<T> asKinesisSpec(EgressSpec<T> spec) {
+ if (spec instanceof KinesisEgressSpec) {
+ return (KinesisEgressSpec<T>) spec;
+ }
+ if (spec == null) {
+ throw new NullPointerException("Unable to translate a NULL spec");
+ }
+ throw new IllegalArgumentException(String.format("Wrong type %s", spec.type()));
+ }
+
+ private static <T> KinesisEgressSerializer<T> serializerInstanceFromSpec(
+ KinesisEgressSpec<T> spec) {
+ return ReflectionUtil.instantiate(spec.serializerClass());
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index dde1e2b..92665eb 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -65,7 +65,7 @@ final class KinesisSourceProvider implements SourceProvider {
final Properties properties = new Properties();
properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
- properties.putAll(AwsAuthConfigProperties.forAwsRegion(spec.awsRegion()));
+ properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(spec.awsRegion()));
properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
setStartupPositionProperties(properties, spec.startupPosition());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
index 1d50656..ccf9875 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/AwsAuthConfigPropertiesTest.java
@@ -32,14 +32,14 @@ import org.junit.Test;
public class AwsAuthConfigPropertiesTest {
@Test
- public void awsDefaultRegionProperties() {
+ public void awsDefaultRegionConsumerProperties() {
// TODO Flink doesn't support auto region detection from the AWS provider chain,
// TODO so we always have to have the region settings available in the client side
// TODO this should no longer be a restriction once we fix this in the Flink connector side
try (final ScopedSystemProperty awsRegionSystemProps =
new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
final Properties properties =
- AwsAuthConfigProperties.forAwsRegion(AwsRegion.fromDefaultProviderChain());
+ AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.fromDefaultProviderChain());
assertThat(properties.entrySet(), hasSize(1));
assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
@@ -47,17 +47,18 @@ public class AwsAuthConfigPropertiesTest {
}
@Test
- public void awsSpecificRegionProperties() {
- final Properties properties = AwsAuthConfigProperties.forAwsRegion(AwsRegion.ofId("us-east-2"));
+ public void awsSpecificRegionConsumerProperties() {
+ final Properties properties =
+ AwsAuthConfigProperties.forAwsRegionConsumerProps(AwsRegion.ofId("us-east-2"));
assertThat(properties.entrySet(), hasSize(1));
assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
}
@Test
- public void awsCustomEndpointRegionProperties() {
+ public void awsCustomEndpointRegionConsumerProperties() {
final Properties properties =
- AwsAuthConfigProperties.forAwsRegion(
+ AwsAuthConfigProperties.forAwsRegionConsumerProps(
AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
assertThat(properties.entrySet(), hasSize(2));
@@ -66,6 +67,42 @@ public class AwsAuthConfigPropertiesTest {
}
@Test
+ public void awsDefaultRegionProducerProperties() {
+ // TODO Flink doesn't support auto region detection from the AWS provider chain,
+ // TODO so we always have to have the region settings available in the client side
+ // TODO this should no longer be a restriction once we fix this in the Flink connector side
+ try (final ScopedSystemProperty awsRegionSystemProps =
+ new ScopedSystemProperty(SDKGlobalConfiguration.AWS_REGION_SYSTEM_PROPERTY, "us-west-1")) {
+ final Properties properties =
+ AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.fromDefaultProviderChain());
+
+ assertThat(properties.entrySet(), hasSize(1));
+ assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-west-1"));
+ }
+ }
+
+ @Test
+ public void awsSpecificRegionProducerProperties() {
+ final Properties properties =
+ AwsAuthConfigProperties.forAwsRegionProducerProps(AwsRegion.ofId("us-east-2"));
+
+ assertThat(properties.entrySet(), hasSize(1));
+ assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-2"));
+ }
+
+ @Test
+ public void awsCustomEndpointRegionProducerProperties() {
+ final Properties properties =
+ AwsAuthConfigProperties.forAwsRegionProducerProps(
+ AwsRegion.ofCustomEndpoint("https://foo.bar:6666", "us-east-1"));
+
+ assertThat(properties.entrySet(), hasSize(3));
+ assertThat(properties, hasEntry("KinesisEndpoint", "foo.bar"));
+ assertThat(properties, hasEntry("KinesisPort", "6666"));
+ assertThat(properties, hasEntry(AWSConfigConstants.AWS_REGION, "us-east-1"));
+ }
+
+ @Test
public void awsDefaultCredentialsProperties() {
final Properties properties =
AwsAuthConfigProperties.forAwsCredentials(AwsCredentials.fromDefaultProviderChain());
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
new file mode 100644
index 0000000..a9fd97a
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/CachingPartitionerSerializerDelegateTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.junit.Test;
+
+public class CachingPartitionerSerializerDelegateTest {
+
+ private static final String TEST_INPUT = "input";
+ private static final String TEST_STREAM = "stream";
+ private static final String TEST_PARTITION_KEY = "partition-key";
+ private static final String TEST_EXPLICIT_HASH_KEY = "explicit-hash-key";
+
+ @Test
+ public void noDuplicateSerialization() {
+ final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+ new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+ cachingDelegate.serialize(TEST_INPUT);
+
+ // these throw if the wrapped serializer is used multiple times
+ cachingDelegate.getTargetStream(TEST_INPUT);
+ cachingDelegate.getPartitionId(TEST_INPUT);
+ cachingDelegate.getExplicitHashKey(TEST_INPUT);
+ }
+
+ @Test
+ public void serialize() {
+ final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+ new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+ assertThat(
+ cachingDelegate.serialize(TEST_INPUT),
+ is(ByteBuffer.wrap(TEST_INPUT.getBytes(StandardCharsets.UTF_8))));
+ }
+
+ @Test
+ public void targetStream() {
+ final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+ new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+ assertThat(cachingDelegate.getTargetStream(TEST_INPUT), is(TEST_STREAM));
+ }
+
+ @Test
+ public void partitionId() {
+ final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+ new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+ assertThat(cachingDelegate.getPartitionId(TEST_INPUT), is(TEST_PARTITION_KEY));
+ }
+
+ @Test
+ public void explicitHashKey() {
+ final CachingPartitionerSerializerDelegate<String> cachingDelegate =
+ new CachingPartitionerSerializerDelegate<>(new DuplicateSerializationDetectingSerializer());
+
+ assertThat(cachingDelegate.getExplicitHashKey(TEST_INPUT), is(TEST_EXPLICIT_HASH_KEY));
+ }
+
+ private static class DuplicateSerializationDetectingSerializer
+ implements KinesisEgressSerializer<String> {
+
+ private boolean isInvoked;
+
+ @Override
+ public EgressRecord serialize(String value) {
+ if (isInvoked) {
+ fail("Duplicate serialization detected.");
+ }
+ isInvoked = true;
+ return EgressRecord.newBuilder()
+ .withData(value.getBytes(StandardCharsets.UTF_8))
+ .withStream(TEST_STREAM)
+ .withPartitionKey(TEST_PARTITION_KEY)
+ .withExplicitHashKey(TEST_EXPLICIT_HASH_KEY)
+ .build();
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
new file mode 100644
index 0000000..cb0fe7f
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProviderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kinesis;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.junit.Test;
+
+public class KinesisSinkProviderTest {
+
+ private static final EgressIdentifier<String> ID =
+ new EgressIdentifier<>("namespace", "name", String.class);
+
+ @Test
+ public void exampleUsage() {
+ final KinesisEgressSpec<String> kinesisEgressSpec =
+ KinesisEgressBuilder.forIdentifier(ID)
+ .withAwsRegion("us-west-1")
+ .withAwsCredentials(AwsCredentials.basic("access-key-id", "secret-access-key"))
+ .withSerializer(TestSerializer.class)
+ .build();
+
+ final KinesisSinkProvider provider = new KinesisSinkProvider();
+ final SinkFunction<String> sink = provider.forSpec(kinesisEgressSpec);
+
+ assertThat(sink, instanceOf(FlinkKinesisProducer.class));
+ }
+
+ private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public EgressRecord serialize(String value) {
+ return null;
+ }
+ }
+}