You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/19 04:22:12 UTC

[flink-statefun] 05/07: [FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f4823097e11c45c9c289100bb6e046f215aff7e8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Mar 16 11:24:05 2020 +0800

    [FLINK-16124] [kinesis] Add Java Kinesis Egress SDK classes
---
 .../flink/statefun/sdk/kinesis/KinesisIOTypes.java |   3 +
 .../statefun/sdk/kinesis/egress/EgressRecord.java  |  98 ++++++++++++++
 .../sdk/kinesis/egress/KinesisEgressBuilder.java   | 149 +++++++++++++++++++++
 .../KinesisEgressSerializer.java}                  |  23 +++-
 .../sdk/kinesis/egress/KinesisEgressSpec.java      |  81 +++++++++++
 .../sdk/kinesis/KinesisEgressBuilderTest.java      |  58 ++++++++
 tools/maven/spotbugs-exclude.xml                   |   8 ++
 7 files changed, 413 insertions(+), 7 deletions(-)

diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
index 4b57b0a..346451e 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.sdk.kinesis;
 
+import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.IngressType;
 
 public final class KinesisIOTypes {
@@ -25,4 +26,6 @@ public final class KinesisIOTypes {
 
   public static final IngressType UNIVERSAL_INGRESS_TYPE =
       new IngressType("statefun.kinesis.io", "universal-ingress");
+  public static final EgressType UNIVERSAL_EGRESS_TYPE =
+      new EgressType("statefun.kinesis.io", "universal-egress");
 }
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
new file mode 100644
index 0000000..ac8c5bb
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/EgressRecord.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/** A record to be written to AWS Kinesis. */
+public final class EgressRecord {
+
+  private final byte[] data;
+  private final String stream;
+  private final String partitionKey;
+  @Nullable private final String explicitHashKey;
+
+  /** @return A builder for a {@link EgressRecord}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private EgressRecord(
+      byte[] data, String stream, String partitionKey, @Nullable String explicitHashKey) {
+    this.data = Objects.requireNonNull(data, "data bytes");
+    this.stream = Objects.requireNonNull(stream, "target stream");
+    this.partitionKey = Objects.requireNonNull(partitionKey, "partition key");
+    this.explicitHashKey = explicitHashKey;
+  }
+
+  /** @return data bytes to write */
+  public byte[] getData() {
+    return data;
+  }
+
+  /** @return target AWS Kinesis stream to write to. */
+  public String getStream() {
+    return stream;
+  }
+
+  /** @return partition key to use when writing the record to AWS Kinesis. */
+  public String getPartitionKey() {
+    return partitionKey;
+  }
+
+  /** @return explicit hash key to use when writing the record to AWS Kinesis. */
+  @Nullable
+  public String getExplicitHashKey() {
+    return explicitHashKey;
+  }
+
+  /** Builder for {@link EgressRecord}. */
+  public static final class Builder {
+    private byte[] data;
+    private String stream;
+    private String partitionKey;
+    private String explicitHashKey;
+
+    private Builder() {}
+
+    public Builder withData(byte[] data) {
+      this.data = data;
+      return this;
+    }
+
+    public Builder withStream(String stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public Builder withPartitionKey(String partitionKey) {
+      this.partitionKey = partitionKey;
+      return this;
+    }
+
+    public Builder withExplicitHashKey(String explicitHashKey) {
+      this.explicitHashKey = explicitHashKey;
+      return this;
+    }
+
+    public EgressRecord build() {
+      return new EgressRecord(data, stream, partitionKey, explicitHashKey);
+    }
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
new file mode 100644
index 0000000..98406c7
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+/**
+ * A builder for creating an {@link EgressSpec} for writing data to AWS Kinesis.
+ *
+ * @param <T> The type written to AWS Kinesis.
+ */
+public final class KinesisEgressBuilder<T> {
+
+  private final EgressIdentifier<T> id;
+
+  private Class<? extends KinesisEgressSerializer<T>> serializerClass;
+  private int maxOutstandingRecords = 1000;
+  private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
+  private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
+  private final Properties clientConfigurationProperties = new Properties();
+
+  private KinesisEgressBuilder(EgressIdentifier<T> id) {
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * @param id A unique egress identifier.
+   * @param <T> The type consumed from Kinesis.
+   * @return A new {@link KinesisEgressBuilder}.
+   */
+  public static <T> KinesisEgressBuilder<T> forIdentifier(EgressIdentifier<T> id) {
+    return new KinesisEgressBuilder<>(id);
+  }
+
+  /**
+   * @param serializerClass The serializer used to convert from Java objects to Kinesis's byte
+   *     messages.
+   */
+  public KinesisEgressBuilder<T> withSerializer(
+      Class<? extends KinesisEgressSerializer<T>> serializerClass) {
+    this.serializerClass = Objects.requireNonNull(serializerClass);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsRegion The AWS region to connect to.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment">Automatically
+   *     Determine the AWS Region from the Environment</a>.
+   * @see AwsRegion
+   */
+  public KinesisEgressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    return this;
+  }
+
+  /**
+   * The AWS region to connect to, specified by the AWS region's unique id. By default, AWS's
+   * default provider chain is consulted.
+   *
+   * @param regionName The unique id of the AWS region to connect to.
+   */
+  public KinesisEgressBuilder<T> withAwsRegion(String regionName) {
+    this.awsRegion = AwsRegion.ofId(regionName);
+    return this;
+  }
+
+  /**
+   * The AWS credentials to use. By default, AWS's default provider chain is consulted.
+   *
+   * @param awsCredentials The AWS credentials to use.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">Using
+   *     the Default Credential Provider Chain</a>.
+   * @see AwsCredentials
+   */
+  public KinesisEgressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    return this;
+  }
+
+  /**
+   * The maximum number of buffered outstanding records, before backpressure is applied by the
+   * egress.
+   *
+   * @param maxOutstandingRecords the maximum number of buffered outstanding records
+   */
+  public KinesisEgressBuilder<T> withMaxOutstandingRecords(int maxOutstandingRecords) {
+    if (maxOutstandingRecords <= 0) {
+      throw new IllegalArgumentException("Max outstanding records must be larger than 0.");
+    }
+    this.maxOutstandingRecords = maxOutstandingRecords;
+    return this;
+  }
+
+  /**
+   * Sets a AWS client configuration to be used by the egress.
+   *
+   * <p>Supported values are properties of AWS's <a
+   * href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/latest/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">ccom.amazonaws.services.kinesis.producer.KinesisProducerConfiguration</a>.
+   * Please see <a
+   * href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">Default
+   * Configuration Properties</a> for a full list of the keys.
+   *
+   * @param key the property to set.
+   * @param value the value for the property.
+   * @see <a
+   *     href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   */
+  public KinesisEgressBuilder<T> withClientConfigurationProperty(String key, String value) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    this.clientConfigurationProperties.setProperty(key, value);
+    return this;
+  }
+
+  /** @return A new {@link KinesisEgressSpec}. */
+  public KinesisEgressSpec<T> build() {
+    return new KinesisEgressSpec<>(
+        id,
+        serializerClass,
+        maxOutstandingRecords,
+        awsRegion,
+        awsCredentials,
+        clientConfigurationProperties);
+  }
+}
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
similarity index 58%
copy from statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
copy to statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
index 4b57b0a..46f605d 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/KinesisIOTypes.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
@@ -15,14 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.sdk.kinesis;
+package org.apache.flink.statefun.sdk.kinesis.egress;
 
-import org.apache.flink.statefun.sdk.IngressType;
+import java.io.Serializable;
 
-public final class KinesisIOTypes {
-
-  private KinesisIOTypes() {}
+/**
+ * Defines how to serialize values of type {@code T} into {@link EgressRecord}s to be written to AWS
+ * Kinesis.
+ *
+ * @param <T> the type of values being written.
+ */
+public interface KinesisEgressSerializer<T> extends Serializable {
 
-  public static final IngressType UNIVERSAL_INGRESS_TYPE =
-      new IngressType("statefun.kinesis.io", "universal-ingress");
+  /**
+   * Serialize an output value into a {@link EgressRecord} to be written to AWS Kinesis.
+   *
+   * @param value the output value to write.
+   * @return a {@link EgressRecord} to be written to AWS Kinesis.
+   */
+  EgressRecord serialize(T value);
 }
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
new file mode 100644
index 0000000..7f6b369
--- /dev/null
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSpec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis.egress;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
+import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
+
+public final class KinesisEgressSpec<T> implements EgressSpec<T> {
+  private final EgressIdentifier<T> egressIdentifier;
+  private final Class<? extends KinesisEgressSerializer<T>> serializerClass;
+  private final int maxOutstandingRecords;
+  private final AwsRegion awsRegion;
+  private final AwsCredentials awsCredentials;
+  private final Properties clientConfigurationProperties;
+
+  KinesisEgressSpec(
+      EgressIdentifier<T> egressIdentifier,
+      Class<? extends KinesisEgressSerializer<T>> serializerClass,
+      int maxOutstandingRecords,
+      AwsRegion awsRegion,
+      AwsCredentials awsCredentials,
+      Properties clientConfigurationProperties) {
+    this.egressIdentifier = Objects.requireNonNull(egressIdentifier);
+    this.serializerClass = Objects.requireNonNull(serializerClass);
+    this.maxOutstandingRecords = maxOutstandingRecords;
+    this.awsRegion = Objects.requireNonNull(awsRegion);
+    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+  }
+
+  @Override
+  public EgressIdentifier<T> id() {
+    return egressIdentifier;
+  }
+
+  @Override
+  public EgressType type() {
+    return KinesisIOTypes.UNIVERSAL_EGRESS_TYPE;
+  }
+
+  public Class<? extends KinesisEgressSerializer<T>> serializerClass() {
+    return serializerClass;
+  }
+
+  public int maxOutstandingRecords() {
+    return maxOutstandingRecords;
+  }
+
+  public AwsRegion awsRegion() {
+    return awsRegion;
+  }
+
+  public AwsCredentials awsCredentials() {
+    return awsCredentials;
+  }
+
+  public Properties clientConfigurationProperties() {
+    return clientConfigurationProperties;
+  }
+}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
new file mode 100644
index 0000000..e4406c4
--- /dev/null
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisEgressBuilderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kinesis;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
+import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
+import org.junit.Test;
+
+public class KinesisEgressBuilderTest {
+
+  private static final EgressIdentifier<String> ID =
+      new EgressIdentifier<>("namespace", "name", String.class);
+
+  @Test
+  public void exampleUsage() {
+    final KinesisEgressSpec<String> kinesisEgressSpec =
+        KinesisEgressBuilder.forIdentifier(ID).withSerializer(TestSerializer.class).build();
+
+    assertThat(kinesisEgressSpec.id(), is(ID));
+    assertTrue(kinesisEgressSpec.awsRegion().isDefault());
+    assertTrue(kinesisEgressSpec.awsCredentials().isDefault());
+    assertEquals(TestSerializer.class, kinesisEgressSpec.serializerClass());
+    assertTrue(kinesisEgressSpec.clientConfigurationProperties().isEmpty());
+  }
+
+  private static final class TestSerializer implements KinesisEgressSerializer<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public EgressRecord serialize(String value) {
+      return null;
+    }
+  }
+}
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 9bd3076..694e52d 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -74,6 +74,14 @@ under the License.
         <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.ingress\.IngressRecord\$Builder"/>
         <Bug pattern="EI_EXPOSE_REP2"/>
     </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Class name="~org\.apache.flink\.statefun\.sdk\.kinesis\.egress\.EgressRecord\$Builder"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
 
     <!-- 3rd party -->
     <Match>