You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:12 UTC
[flink-statefun] 05/07: [FLINK-16124] [kinesis] Add Java Kinesis
Egress SDK classes
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f4823097e11c45c9c289100bb6e046f215aff7e8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:05 2020 +0800
[FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes
---
.../flink/statefun/sdk/kinesis/KinesisIOTypes.java | 3 +
.../statefun/sdk/kinesis/egress/EgressRecord.java | 98 ++++++++++++++
.../sdk/kinesis/egress/KinesisEgressBuilder.java | 149 +++++++++++++++++++++
.../KinesisEgressSerializer.java} | 23 +++-
.../sdk/kinesis/egress/KinesisEgressSpec.java | 81 +++++++++++
.../sdk/kinesis/KinesisEgressBuilderTest.java | 58 ++++++++
tools/maven/spotbugs-exclude.xml | 8 ++
7 files changed, 413 insertions(+), 7 deletions(-)
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
index 4b57b0a..346451e 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.statefun.sdk.kinesis;
+import org.apache.flink.statefun.sdk.EgressType;
import org.apache.flink.statefun.sdk.IngressType;
public final class KinesisIOTypes {
@@ -25,4 +26,6 @@ public final class KinesisIOTypes {
public static final IngressType UNIVERSAL_INGRESS_TYPE =
new IngressType("statefun.kinesis.io", "universal-ingress");
+ public static final EgressType UNIVERSAL_EGRESS_TYPE =
+ new EgressType("statefun.kinesis.io", "universal-egress");
}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
new file mode 100644
index 0000000..ac8c5bb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/** A record to be written to AWS Kinesis. */
+public final class EgressRecord {
+
+ private final byte[] data;
+ private final String stream;
+ private final String partitionKey;
+ @Nullable private final String explicitHashKey;
+
+ /** @return A builder for a {@link EgressRecord}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private EgressRecord(
+ byte[] data, String stream, String partitionKey, @Nullable String explicitHashKey) {
+ this.data = Objects.requireNonNull(data, "data bytes");
+ this.stream = Objects.requireNonNull(stream, "target stream");
+ this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+ this.explicitHashKey = explicitHashKey;
+ }
+
+ /** @return data bytes to write */
+ public byte[] getData() {
+ return data;
+ }
+
+ /** @return target AWS Kinesis stream to write to. */
+ public String getStream() {
+ return stream;
+ }
+
+ /** @return partition key to use when writing the record to AWS Kinesis. */
+ public String getPartitionKey() {
+ return partitionKey;
+ }
+
+ /** @return explicit hash key to use when writing the record to AWS Kinesis. */
+ @Nullable
+ public String getExplicitHashKey() {
+ return explicitHashKey;
+ }
+
+ /** Builder for {@link EgressRecord}. */
+ public static final class Builder {
+ private byte[] data;
+ private String stream;
+ private String partitionKey;
+ private String explicitHashKey;
+
+ private Builder() {}
+
+ public Builder withData(byte[] data) {
+ this.data = data;
+ return this;
+ }
+
+ public Builder withStream(String stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public Builder withPartitionKey(String partitionKey) {
+ this.partitionKey = partitionKey;
+ return this;
+ }
+
+ public Builder withExplicitHashKey(String explicitHashKey) {
+ this.explicitHashKey = explicitHashKey;
+ return this;
+ }
+
+ public EgressRecord build() {
+ return new EgressRecord(data, stream, partitionKey, explicitHashKey);
+ }
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
new file mode 100644
index 0000000..98406c7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link EgressSpec} for writing data to AWS Kinesis.
+ *
+ * @param <T> The type written to AWS Kinesis.
+ */
+public final class KinesisEgressBuilder<T> {
+
+ private final EgressIdentifier<T> id;
+
+ private Class<? extends KinesisEgressSerializer<T>> serializerClass;
+ private int maxOutstandingRecords = 1000;
+ private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+ private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+ private final Properties clientConfigurationProperties = new Properties();
+
+ private KinesisEgressBuilder(EgressIdentifier<T> id) {
+ this.id = Objects.requireNonNull(id);
+ }
+
+ /**
+ * @param id A unique egress identifier.
+ * @param <T> The type consumed from Kinesis.
+ * @return A new {@link KinesisEgressBuilder}.
+ */
+ public static <T> KinesisEgressBuilder<T> forIdentifier(EgressIdentifier<T> id) {
+ return new KinesisEgressBuilder<>(id);
+ }
+
+ /**
+ * @param serializerClass The serializer used to convert from Java objects to Kinesis's byte
+ * messages.
+ */
+ public KinesisEgressBuilder<T> withSerializer(
+ Class<? extends KinesisEgressSerializer<T>> serializerClass) {
+ this.serializerClass = Objects.requireNonNull(serializerClass);
+ return this;
+ }
+
+ /**
+ * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+ *
+ * @param awsRegion The AWS region to connect to.
+ * @see <a
+ * href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+ * Determine the AWS Region from the Environment</a>.
+ * @see AwsRegion
+ */
+ public KinesisEgressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+ this.awsRegion = Objects.requireNonNull(awsRegion);
+ return this;
+ }
+
+ /**
+ * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+ * default provider chain is consulted.
+ *
+ * @param regionName The unique id of the AWS region to connect to.
+ */
+ public KinesisEgressBuilder<T> withAwsRegion(String regionName) {
+ this.awsRegion = AwsRegion.ofId(regionName);
+ return this;
+ }
+
+ /**
+ * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+ *
+ * @param awsCredentials The AWS credentials to use.
+ * @see <a
+ * href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+ * the Default Credential Provider Chain</a>.
+ * @see AwsCredentials
+ */
+ public KinesisEgressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+ this.awsCredentials = Objects.requireNonNull(awsCredentials);
+ return this;
+ }
+
+ /**
+ * The maximum number of buffered outstanding records, before backpressure is applied by the
+ * egress.
+ *
+ * @param maxOutstandingRecords the maximum number of buffered outstanding records
+ */
+ public KinesisEgressBuilder<T> withMaxOutstandingRecords(int maxOutstandingRecords) {
+ if (maxOutstandingRecords <= 0) {
+ throw new IllegalArgumentException("Max outstanding records must be larger than 0.");
+ }
+ this.maxOutstandingRecords = maxOutstandingRecords;
+ return this;
+ }
+
+ /**
+ * Sets a AWS client configuration to be used by the egress.
+ *
+ * <p>Supported values are properties of AWS's <a
+ * href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/latest/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">ccom.amazonaws.services.kinesis.producer.KinesisProducerConfiguration</a>.
+ * Please see <a
+ * href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">Default
+ * Configuration Properties</a> for a full list of the keys.
+ *
+ * @param key the property to set.
+ * @param value the value for the property.
+ * @see <a
+ * href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+ */
+ public KinesisEgressBuilder<T> withClientConfigurationProperty(String key, String value) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(value);
+ this.clientConfigurationProperties.setProperty(key, value);
+ return this;
+ }
+
+ /** @return A new {@link KinesisEgressSpec}. */
+ public KinesisEgressSpec<T> build() {
+ return new KinesisEgressSpec<>(
+ id,
+ serializerClass,
+ maxOutstandingRecords,
+ awsRegion,
+ awsCredentials,
+ clientConfigurationProperties);
+ }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
similarity index 58%
copy from statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
copy to statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
index 4b57b0a..46f605d 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
@@ -15,14 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.sdk.kinesis;
+package org.apache.flink.statefun.sdk.kinesis.egress;
-import org.apache.flink.statefun.sdk.IngressType;
+import java.io.Serializable;
-public final class KinesisIOTypes {
-
- private KinesisIOTypes() {}
+/**
+ * Defines how to serialize values of type {@code T} into {@link EgressRecord}s to be written to AWS
+ * Kinesis.
+ *
+ * @param <T> the type of values being written.
+ */
+public interface KinesisEgressSerializer<T> extends Serializable {
- public static final IngressType UNIVERSAL_INGRESS_TYPE =
- new IngressType("statefun.kinesis.io", "universal-ingress");
+ /**
+ * Serialize an output value into a {@link EgressRecord} to be written to AWS Kinesis.
+ *
+ * @param value the output value to write.
+ * @return a {@link EgressRecord} to be written to AWS Kinesis.
+ */
+ EgressRecord serialize(T value);
}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
new file mode 100644
index 0000000..7f6b369
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisEgressSpec<T> implements EgressSpec<T> {
+ private final EgressIdentifier<T> egressIdentifier;
+ private final Class<? extends KinesisEgressSerializer<T>> serializerClass;
+ private final int maxOutstandingRecords;
+ private final AwsRegion awsRegion;
+ private final AwsCredentials awsCredentials;
+ private final Properties clientConfigurationProperties;
+
+ KinesisEgressSpec(
+ EgressIdentifier<T> egressIdentifier,
+ Class<? extends KinesisEgressSerializer<T>> serializerClass,
+ int maxOutstandingRecords,
+ AwsRegion awsRegion,
+ AwsCredentials awsCredentials,
+ Properties clientConfigurationProperties) {
+ this.egressIdentifier = Objects.requireNonNull(egressIdentifier);
+ this.serializerClass = Objects.requireNonNull(serializerClass);
+ this.maxOutstandingRecords = maxOutstandingRecords;
+ this.awsRegion = Objects.requireNonNull(awsRegion);
+ this.awsCredentials = Objects.requireNonNull(awsCredentials);
+ this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+ }
+
+ @Override
+ public EgressIdentifier<T> id() {
+ return egressIdentifier;
+ }
+
+ @Override
+ public EgressType type() {
+ return KinesisIOTypes.UNIVERSAL_EGRESS_TYPE;
+ }
+
+ public Class<? extends KinesisEgressSerializer<T>> serializerClass() {
+ return serializerClass;
+ }
+
+ public int maxOutstandingRecords() {
+ return maxOutstandingRecords;
+ }
+
+ public AwsRegion awsRegion() {
+ return awsRegion;
+ }
+
+ public AwsCredentials awsCredentials() {
+ return awsCredentials;
+ }
+
+ public Properties clientConfigurationProperties() {
+ return clientConfigurationProperties;
+ }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
new file mode 100644
index 0000000..e4406c4
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.junit.Test;
+
+public class KinesisEgressBuilderTest {
+
+ private static final EgressIdentifier<String> ID =
+ new EgressIdentifier<>("namespace", "name", String.class);
+
+ @Test
+ public void exampleUsage() {
+ final KinesisEgressSpec<String> kinesisEgressSpec =
+ KinesisEgressBuilder.forIdentifier(ID).withSerializer(TestSerializer.class).build();
+
+ assertThat(kinesisEgressSpec.id(), is(ID));
+ assertTrue(kinesisEgressSpec.awsRegion().isDefault());
+ assertTrue(kinesisEgressSpec.awsCredentials().isDefault());
+ assertEquals(TestSerializer.class, kinesisEgressSpec.serializerClass());
+ assertTrue(kinesisEgressSpec.clientConfigurationProperties().isEmpty());
+ }
+
+ private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public EgressRecord serialize(String value) {
+ return null;
+ }
+ }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 9bd3076..694e52d 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -74,6 +74,14 @@ under the License.
<Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord\$Builder"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
<!-- 3rd party -->
<Match>