You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:18 UTC
[28/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
new file mode 100644
index 0000000..9e0c7e8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kinesis_2.10</artifactId>
+ <name>flink-connector-kinesis</name>
+ <properties>
+ <aws.sdk.version>1.10.71</aws.sdk.version>
+ <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
+ <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
+ </properties>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Note:
+ The below dependencies are licenced under the Amazon Software License.
+ Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason.
+ -->
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-kinesis</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-producer</artifactId>
+ <version>${aws.kinesis-kpl.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-client</artifactId>
+ <version>${aws.kinesis-kcl.version}</version>
+ <!--
+ We're excluding the below from the KCL since we'll only be using the
+ com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies.
+ -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-dynamodb</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-cloudwatch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet combine.children="append">
+ <includes>
+ <include>com.amazonaws:*</include>
+ <include>com.google.protobuf:*</include>
+ </includes>
+ </artifactSet>
+ <relocations combine.children="override">
+ <!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
+ <relocation>
+ <pattern>org.objectweb.asm</pattern>
+ <shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.amazonaws</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
new file mode 100644
index 0000000..a62dc10
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
+ * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
+ * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
+ * change as shards are closed and created by Kinesis.
+ *
+ * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
+ *
+ * @param <T> the type of data emitted
+ */
+public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
+ implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 4724006128720664870L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+ // ------------------------------------------------------------------------
+ // Consumer properties
+ // ------------------------------------------------------------------------
+
+ /** The names of the Kinesis streams that we will be consuming from */
+ private final List<String> streams;
+
+ /** Properties to parametrize settings such as AWS service region, initial position in stream,
+ * shard list retrieval behaviours, etc */
+ private final Properties configProps;
+
+ /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */
+ private final KinesisDeserializationSchema<T> deserializer;
+
+ // ------------------------------------------------------------------------
+ // Runtime state
+ // ------------------------------------------------------------------------
+
+ /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
+ private transient KinesisDataFetcher<T> fetcher;
+
+ /** The sequence numbers in the last state snapshot of this subtask */
+ private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot;
+
+ /** The sequence numbers to restore to upon restore from failure */
+ private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
+
+ private volatile boolean running = true;
+
+
+ // ------------------------------------------------------------------------
+ // Constructors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new Flink Kinesis Consumer.
+ *
+ * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+ * from are configured with a {@link Properties} instance.</p>
+ *
+ * @param stream
+ * The single AWS Kinesis stream to read from.
+ * @param deserializer
+ * The deserializer used to convert raw bytes of Kinesis records to Java objects (without key).
+ * @param configProps
+ * The properties used to configure AWS credentials, AWS region, and initial starting position.
+ */
+ public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) {
+ this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
+ }
+
+ /**
+ * Creates a new Flink Kinesis Consumer.
+ *
+ * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+ * from are configured with a {@link Properties} instance.</p>
+ *
+ * @param stream
+ * The single AWS Kinesis stream to read from.
+ * @param deserializer
+ * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
+ * @param configProps
+ * The properties used to configure AWS credentials, AWS region, and initial starting position.
+ */
+ public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+ this(Collections.singletonList(stream), deserializer, configProps);
+ }
+
+ /**
+ * Creates a new Flink Kinesis Consumer.
+ *
+ * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+ * from are configured with a {@link Properties} instance.</p>
+ *
+ * @param streams
+ * The AWS Kinesis streams to read from.
+ * @param deserializer
+ * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
+ * @param configProps
+ * The properties used to configure AWS credentials, AWS region, and initial starting position.
+ */
+ public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+ checkNotNull(streams, "streams can not be null");
+ checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
+ checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
+ this.streams = streams;
+
+ this.configProps = checkNotNull(configProps, "configProps can not be null");
+
+ // check the configuration properties for any conflicting settings
+ KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
+
+ this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
+
+ if (LOG.isInfoEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ for (String stream : streams) {
+ sb.append(stream).append(", ");
+ }
+ LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Source life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ // restore to the last known sequence numbers from the latest complete snapshot
+ if (sequenceNumsToRestore != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state",
+ getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
+ }
+
+ // initialize sequence numbers with restored state
+ lastStateSnapshot = sequenceNumsToRestore;
+ } else {
+ // start fresh with empty sequence numbers if there are no snapshots to restore from.
+ lastStateSnapshot = new HashMap<>();
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+
+ // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
+ // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
+ // can potentially have new shards to subscribe to later on
+ fetcher = new KinesisDataFetcher<>(
+ streams, sourceContext, getRuntimeContext(), configProps, deserializer);
+
+ boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
+ fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
+
+ // if we are restoring from a checkpoint, we iterate over the restored
+ // state and accordingly seed the fetcher with subscribed shards states
+ if (isRestoringFromFailure) {
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
+ fetcher.advanceLastDiscoveredShardOfStream(
+ restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
+ " starting state set to the restored sequence number {}",
+ getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue());
+ }
+ fetcher.registerNewSubscribedShardState(
+ new KinesisStreamShardState(restored.getKey(), restored.getValue()));
+ }
+ }
+
+ // check that we are running before starting the fetcher
+ if (!running) {
+ return;
+ }
+
+ // start the fetcher loop. The fetcher will stop running only when cancel() or
+ // close() is called, or an error is thrown by threads created by the fetcher
+ fetcher.runFetcher();
+
+ // check that the fetcher has terminated before fully closing
+ fetcher.awaitTermination();
+ sourceContext.close();
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+
+ KinesisDataFetcher fetcher = this.fetcher;
+ this.fetcher = null;
+
+ // this method might be called before the subtask actually starts running,
+ // so we must check if the fetcher is actually created
+ if (fetcher != null) {
+ try {
+ // interrupt the fetcher of any work
+ fetcher.shutdownFetcher();
+ fetcher.awaitTermination();
+ } catch (Exception e) {
+ LOG.warn("Error while closing Kinesis data fetcher", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+ super.close();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ // ------------------------------------------------------------------------
+ // State Snapshot & Restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if (lastStateSnapshot == null) {
+ LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+ return null;
+ }
+
+ if (fetcher == null) {
+ LOG.debug("snapshotState() requested on not yet running source; returning null.");
+ return null;
+ }
+
+ if (!running) {
+ LOG.debug("snapshotState() called on closed source; returning null.");
+ return null;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotting state ...");
+ }
+
+ lastStateSnapshot = fetcher.snapshotState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
+ lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
+ }
+
+ return lastStateSnapshot;
+ }
+
+ @Override
+ public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
+ sequenceNumsToRestore = restoredState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
new file mode 100644
index 0000000..579bd6b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis.
+ *
+ * @param <OUT> Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+ /** Properties to parametrize settings such as AWS service region, access key etc. */
+ private final Properties configProps;
+
+ /* Flag controlling the error behavior of the producer */
+ private boolean failOnError = false;
+
+ /* Name of the default stream to produce to. Can be overwritten by the serialization schema */
+ private String defaultStream;
+
+ /* Default partition id. Can be overwritten by the serialization schema */
+ private String defaultPartition;
+
+ /* Schema for turning the OUT type into a byte array. */
+ private final KinesisSerializationSchema<OUT> schema;
+
+ /* Optional custom partitioner */
+ private KinesisPartitioner<OUT> customPartitioner = null;
+
+
+ // --------------------------- Runtime fields ---------------------------
+
+
+ /* Our Kinesis instance for each parallel Flink sink */
+ private transient KinesisProducer producer;
+
+ /* Callback handling failures */
+ private transient FutureCallback<UserRecordResult> callback;
+
+ /* Field for async exception */
+ private transient volatile Throwable thrownException;
+
+
+ // --------------------------- Initialization and configuration ---------------------------
+
+
+ /**
+ * Create a new FlinkKinesisProducer.
+ * This is a constructor supporting Flink's {@see SerializationSchema}.
+ *
+ * @param schema Serialization schema for the data type
+ * @param configProps The properties used to configure AWS credentials and AWS region
+ */
+ public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
+
+ // create a simple wrapper for the serialization schema
+ this(new KinesisSerializationSchema<OUT>() {
+ @Override
+ public ByteBuffer serialize(OUT element) {
+ // wrap into ByteBuffer
+ return ByteBuffer.wrap(schema.serialize(element));
+ }
+ // use default stream and hash key
+ @Override
+ public String getTargetStream(OUT element) {
+ return null;
+ }
+ }, configProps);
+ }
+
+ /**
+ * Create a new FlinkKinesisProducer.
+ * This is a constructor supporting {@see KinesisSerializationSchema}.
+ *
+ * @param schema Kinesis serialization schema for the data type
+ * @param configProps The properties used to configure AWS credentials and AWS region
+ */
+ public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
+ this.configProps = checkNotNull(configProps, "configProps can not be null");
+
+ // check the configuration properties for any conflicting settings
+ KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+
+ ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
+ this.schema = schema;
+ }
+
+ /**
+ * If set to true, the producer will immediately fail with an exception on any error.
+ * Otherwise, the errors are logged and the producer goes on.
+ *
+ * @param failOnError Error behavior flag
+ */
+ public void setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ }
+
+ /**
+ * Set a default stream name.
+ * @param defaultStream Name of the default Kinesis stream
+ */
+ public void setDefaultStream(String defaultStream) {
+ this.defaultStream = defaultStream;
+ }
+
+ /**
+ * Set default partition id
+ * @param defaultPartition Name of the default partition
+ */
+ public void setDefaultPartition(String defaultPartition) {
+ this.defaultPartition = defaultPartition;
+ }
+
+ public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
+ Objects.requireNonNull(partitioner);
+ ClosureCleaner.ensureSerializable(partitioner);
+ this.customPartitioner = partitioner;
+ }
+
+
+ // --------------------------- Lifecycle methods ---------------------------
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
+
+ producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+ producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+ if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+ producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
+ ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
+ }
+ if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+ producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
+ ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
+ }
+
+ producer = new KinesisProducer(producerConfig);
+ callback = new FutureCallback<UserRecordResult>() {
+ @Override
+ public void onSuccess(UserRecordResult result) {
+ if (!result.isSuccessful()) {
+ if(failOnError) {
+ thrownException = new RuntimeException("Record was not sent successful");
+ } else {
+ LOG.warn("Record was not sent successful");
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (failOnError) {
+ thrownException = t;
+ } else {
+ LOG.warn("An exception occurred while processing a record", t);
+ }
+ }
+ };
+
+ if (this.customPartitioner != null) {
+ this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
+ }
+
+ @Override
+ public void invoke(OUT value) throws Exception {
+ if (this.producer == null) {
+ throw new RuntimeException("Kinesis producer has been closed");
+ }
+ if (thrownException != null) {
+ String errorMessages = "";
+ if (thrownException instanceof UserRecordFailedException) {
+ List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
+ for (Attempt attempt: attempts) {
+ if (attempt.getErrorMessage() != null) {
+ errorMessages += attempt.getErrorMessage() +"\n";
+ }
+ }
+ }
+ if (failOnError) {
+ throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
+ } else {
+ LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
+ thrownException = null; // reset
+ }
+ }
+
+ String stream = defaultStream;
+ String partition = defaultPartition;
+
+ ByteBuffer serialized = schema.serialize(value);
+
+ // maybe set custom stream
+ String customStream = schema.getTargetStream(value);
+ if (customStream != null) {
+ stream = customStream;
+ }
+
+ String explicitHashkey = null;
+ // maybe set custom partition
+ if (customPartitioner != null) {
+ partition = customPartitioner.getPartitionId(value);
+ explicitHashkey = customPartitioner.getExplicitHashKey(value);
+ }
+
+ if (stream == null) {
+ if (failOnError) {
+ throw new RuntimeException("No target stream set");
+ } else {
+ LOG.warn("No target stream set. Skipping record");
+ return;
+ }
+ }
+
+ ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
+ Futures.addCallback(cb, callback);
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Closing producer");
+ super.close();
+ KinesisProducer kp = this.producer;
+ this.producer = null;
+ if (kp != null) {
+ LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
+ // try to flush all outstanding records
+ while (kp.getOutstandingRecordsCount() > 0) {
+ kp.flush();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.warn("Flushing was interrupted.");
+ // stop the blocking flushing and destroy producer immediately
+ break;
+ }
+ }
+ LOG.info("Flushing done. Destroying producer instance.");
+ kp.destroy();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
new file mode 100644
index 0000000..bd23abe
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import java.io.Serializable;
+
+public abstract class KinesisPartitioner<T> implements Serializable {
+
+ /**
+ * Return a partition id based on the input
+ * @param element Element to partition
+ * @return A string representing the partition id
+ */
+ public abstract String getPartitionId(T element);
+
+ /**
+ * Optional method for setting an explicit hash key
+ * @param element Element to get the hash key for
+ * @return the hash key for the element
+ */
+ public String getExplicitHashKey(T element) {
+ return null;
+ }
+
+ /**
+ * Optional initializer.
+ *
+ * @param indexOfThisSubtask Index of this partitioner instance
+ * @param numberOfParallelSubtasks Total number of parallel instances
+ */
+ public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
+ //
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
new file mode 100644
index 0000000..01d4f00
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * Configuration keys for AWS service usage
+ */
+public class AWSConfigConstants {
+
+ /**
+ * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis.
+ * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used.
+ */
+ public enum CredentialProvider {
+
+ /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
+ ENV_VAR,
+
+ /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
+ SYS_PROP,
+
+ /** Use a AWS credentials profile file to create the AWS credentials */
+ PROFILE,
+
+ /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
+ BASIC,
+
+ /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
+ AUTO,
+ }
+
+ /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
+ public static final String AWS_REGION = "aws.region";
+
+ /** The AWS access key ID to use when setting credentials provider type to BASIC */
+ public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+
+ /** The AWS secret key to use when setting credentials provider type to BASIC */
+ public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
+
+ /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
+ public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
+ /** Optional configuration for profile path if credential provider type is set to be PROFILE */
+ public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+
+ /** Optional configuration for profile name if credential provider type is set to be PROFILE */
+ public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+
+ /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
+ public static final String AWS_ENDPOINT = "aws.endpoint";
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
new file mode 100644
index 0000000..76c20ed
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+
+/**
+ * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
+ */
+public class ConsumerConfigConstants extends AWSConfigConstants {
+
+ /**
+ * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used
+ * when the consumer tasks retrieve the first shard iterator for each Kinesis shard.
+ */
+ public enum InitialPosition {
+
+ /** Start reading from the earliest possible record in the stream (excluding expired data records) */
+ TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
+
+ /** Start reading from the latest incoming record */
+ LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+
+ private SentinelSequenceNumber sentinelSequenceNumber;
+
+ InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
+ this.sentinelSequenceNumber = sentinelSequenceNumber;
+ }
+
+ public SentinelSequenceNumber toSentinelSequenceNumber() {
+ return this.sentinelSequenceNumber;
+ }
+ }
+
+ /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
+ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
+
+ /** The base backoff time between each describeStream attempt */
+ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
+
+ /** The maximum backoff time between each describeStream attempt */
+ public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
+
+ /** The power constant for exponential backoff between each describeStream attempt */
+ public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
+
+ /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
+ public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
+
+ /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
+ public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
+
+ /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+ public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
+
+ /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+ public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
+
+ /** The power constant for exponential backoff between each getRecords attempt */
+ public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
+
+ /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
+ public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
+
+ /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
+ public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
+
+ /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+ public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
+
+ /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+ public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
+
+ /** The power constant for exponential backoff between each getShardIterator attempt */
+ public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
+
+ /** The interval between each attempt to discover new shards */
+ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
+
+ // ------------------------------------------------------------------------
+ // Default values for consumer configuration
+ // ------------------------------------------------------------------------
+
+ public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
+
+ public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+ public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+ public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+ public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+ public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+ public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+ public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+ public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+ public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
+
+ public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+ public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+ public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
+
+ public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+ public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
+
+ /**
+ * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
+ * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
+ */
+ public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
new file mode 100644
index 0000000..1edddfc
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+/**
+ * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
+ */
+public class ProducerConfigConstants extends AWSConfigConstants {
+
+ /** Maximum number of items to pack into an PutRecords request. **/
+ public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
+
+ /** Maximum number of items to pack into an aggregated record. **/
+ public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
new file mode 100644
index 0000000..55668c6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to consume data from Kinesis
+ */
+public class ConsumeFromKinesis {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool pt = ParameterTool.fromArgs(args);
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(1);
+
+ Properties kinesisConsumerConfig = new Properties();
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
+
+ DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
+ "flink-test",
+ new SimpleStringSchema(),
+ kinesisConsumerConfig));
+
+ kinesis.print();
+
+ see.execute();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
new file mode 100644
index 0000000..d178137
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool pt = ParameterTool.fromArgs(args);
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(1);
+
+ DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
+
+ Properties kinesisProducerConfig = new Properties();
+ kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+ kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+ kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+
+ FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
+ new SimpleStringSchema(), kinesisProducerConfig);
+
+ kinesis.setFailOnError(true);
+ kinesis.setDefaultStream("flink-test");
+ kinesis.setDefaultPartition("0");
+
+ simpleStringStream.addSink(kinesis);
+
+ see.execute();
+ }
+
+ public static class EventsGenerator implements SourceFunction<String> {
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ long seq = 0;
+ while(running) {
+ Thread.sleep(10);
+ ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
new file mode 100644
index 0000000..a06fdca
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
+ * <ul>
+ * <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
+ * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
+ * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
+ * to the same subset of shards even after restoring)</li>
+ * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
+ * <li>3. subscribe to shards by creating a single thread for each shard</li>
+ * </ul>
+ *
+ * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the handler methods provided in this class.
+ */
+public class KinesisDataFetcher<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+ // ------------------------------------------------------------------------
+ // Consumer-wide settings
+ // ------------------------------------------------------------------------
+
+ /** Configuration properties for the Flink Kinesis Consumer */
+ private final Properties configProps;
+
+ /** The list of Kinesis streams that the consumer is subscribing to */
+ private final List<String> streams;
+
+ /**
+ * The deserialization schema we will be using to convert Kinesis records to Flink objects.
+ * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must
+ * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}.
+ */
+ private final KinesisDeserializationSchema<T> deserializationSchema;
+
+ // ------------------------------------------------------------------------
+ // Subtask-specific settings
+ // ------------------------------------------------------------------------
+
+ /** Runtime context of the subtask that this fetcher was created in */
+ private final RuntimeContext runtimeContext;
+
+ private final int totalNumberOfConsumerSubtasks;
+
+ private final int indexOfThisConsumerSubtask;
+
+ /**
+ * This flag should be set by {@link FlinkKinesisConsumer} using
+ * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
+ */
+ private boolean isRestoredFromFailure;
+
+ // ------------------------------------------------------------------------
+ // Executor services to run created threads
+ // ------------------------------------------------------------------------
+
+ /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
+ private final ExecutorService shardConsumersExecutor;
+
+ // ------------------------------------------------------------------------
+ // Managed state, accessed and updated across multiple threads
+ // ------------------------------------------------------------------------
+
+ /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in.
+ * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
+ */
+ private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
+
+ /**
+ * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher
+ * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update
+ * the last processed sequence number of subscribed shards as they fetch and process records.
+ *
+ * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations
+ * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose,
+ * all threads must use the following thread-safe methods this class provides to operate on this list:
+ * <ul>
+ * <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
+ * <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li>
+ * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li>
+ * </ul>
+ */
+ private final List<KinesisStreamShardState> subscribedShardsState;
+
+ private final SourceFunction.SourceContext<T> sourceContext;
+
+ /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
+ private final Object checkpointLock;
+
+ /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
+ private final AtomicReference<Throwable> error;
+
+ /** The Kinesis proxy that the fetcher will be using to discover new shards */
+ private final KinesisProxyInterface kinesis;
+
+ /** Thread that executed runFetcher() */
+ private Thread mainThread;
+
+ /**
+ * The current number of shards that are actively read by this fetcher.
+ *
+ * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+ * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
+ */
+ private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
+
+ private volatile boolean running = true;
+
+ /**
+ * Creates a Kinesis Data Fetcher.
+ *
+ * @param streams the streams to subscribe to
+ * @param sourceContext context of the source function
+ * @param runtimeContext this subtask's runtime context
+ * @param configProps the consumer configuration properties
+ * @param deserializationSchema deserialization schema
+ */
+ public KinesisDataFetcher(List<String> streams,
+ SourceFunction.SourceContext<T> sourceContext,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<T> deserializationSchema) {
+ this(streams,
+ sourceContext,
+ sourceContext.getCheckpointLock(),
+ runtimeContext,
+ configProps,
+ deserializationSchema,
+ new AtomicReference<Throwable>(),
+ new LinkedList<KinesisStreamShardState>(),
+ createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
+ KinesisProxy.create(configProps));
+ }
+
+ /** This constructor is exposed for testing purposes */
+ protected KinesisDataFetcher(List<String> streams,
+ SourceFunction.SourceContext<T> sourceContext,
+ Object checkpointLock,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<T> deserializationSchema,
+ AtomicReference<Throwable> error,
+ LinkedList<KinesisStreamShardState> subscribedShardsState,
+ HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
+ KinesisProxyInterface kinesis) {
+ this.streams = checkNotNull(streams);
+ this.configProps = checkNotNull(configProps);
+ this.sourceContext = checkNotNull(sourceContext);
+ this.checkpointLock = checkNotNull(checkpointLock);
+ this.runtimeContext = checkNotNull(runtimeContext);
+ this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+ this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
+ this.deserializationSchema = checkNotNull(deserializationSchema);
+ this.kinesis = checkNotNull(kinesis);
+
+ this.error = checkNotNull(error);
+ this.subscribedShardsState = checkNotNull(subscribedShardsState);
+ this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
+
+ this.shardConsumersExecutor =
+ createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+ }
+
+ /**
+ * Starts the fetcher. After starting the fetcher, it can only
+ * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
+ *
+ * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher.
+ */
+ public void runFetcher() throws Exception {
+
+ // check that we are running before proceeding
+ if (!running) {
+ return;
+ }
+
+ this.mainThread = Thread.currentThread();
+
+ // ------------------------------------------------------------------------
+ // Procedures before starting the infinite while loop:
+ // ------------------------------------------------------------------------
+
+ // 1. query for any new shards that may have been created while the Kinesis consumer was not running,
+ // and register them to the subscribedShardState list.
+ if (LOG.isDebugEnabled()) {
+ String logFormat = (!isRestoredFromFailure)
+ ? "Subtask {} is trying to discover initial shards ..."
+ : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " +
+ "running due to failure ...";
+
+ LOG.debug(logFormat, indexOfThisConsumerSubtask);
+ }
+ List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe();
+ for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
+ // the starting state for new shards created while the consumer wasn't running depends on whether or not
+ // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means
+ // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint,
+ // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards.
+ InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty(
+ ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
+
+ SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure)
+ ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
+ : initialPosition.toSentinelSequenceNumber();
+
+ if (LOG.isInfoEnabled()) {
+ String logFormat = (!isRestoredFromFailure)
+ ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}"
+ : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " +
+ "running due to failure, starting state set as sequence number {}";
+
+ LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get());
+ }
+ registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get()));
+ }
+
+ // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by
+ // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
+ boolean hasShards = false;
+ StringBuilder streamsWithNoShardsFound = new StringBuilder();
+ for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
+ if (streamToLastDiscoveredShardEntry.getValue() != null) {
+ hasShards = true;
+ } else {
+ streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
+ }
+ }
+
+ if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
+ LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
+ indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
+ }
+
+ if (!hasShards) {
+ throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
+ }
+
+ // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the
+ // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
+ // consumer using a restored state checkpoint
+ for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
+ KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
+
+ // only start a consuming thread if the seeded subscribed shard has not been completely read already
+ if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
+ indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
+ seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
+ }
+
+ shardConsumersExecutor.submit(
+ new ShardConsumer<>(
+ this,
+ seededStateIndex,
+ subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+ subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ // finally, start the infinite shard discovery and consumer launching loop;
+ // we will escape from this loop only when shutdownFetcher() or stopWithError() is called
+
+ final long discoveryIntervalMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+
+ // FLINK-4341:
+ // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
+ // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
+ // the downstream watermarks would not advance, leading to unbounded accumulating state.
+ //
+ // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
+ // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
+ // will be messed up.
+ //
+ // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
+ // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
+ // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
+ // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
+ // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
+ // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
+ // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
+ // will be subscribed by this subtask after restore as initial shards on startup.
+ //
+ // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
+ // Please see FLINK-4341 for more detail
+
+ boolean emittedMaxValueWatermark = false;
+
+ if (this.numberOfActiveShards.get() == 0) {
+ // FLINK-4341 workaround case (a) - please see the above for details on this case
+ LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
+ indexOfThisConsumerSubtask);
+ sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+ emittedMaxValueWatermark = true;
+ }
+
+ while (running) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
+ indexOfThisConsumerSubtask);
+ }
+ List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
+
+ // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
+ // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
+ // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
+ // may not correctly reflect the discover result in the below case determination. This may lead to incorrect
+ // case determination on the current discovery attempt, but can still be correctly handled on future attempts.
+ //
+ // Although this can be resolved by wrapping the current shard discovery attempt with the below
+ // case determination within a synchronized block on the checkpoint lock for atomicity, there will be
+ // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
+ // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
+ // we can still eventually handle max value watermark emitting / deliberately failing on successive
+ // discovery attempts.
+
+ if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
+ // FLINK-4341 workaround case (a) - please see the above for details on this case
+ LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
+ indexOfThisConsumerSubtask);
+ sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+ emittedMaxValueWatermark = true;
+ } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
+ // FLINK-4341 workaround case (b) - please see the above for details on this case
+ //
+ // Note that in the case where on resharding this subtask ceased to read all of it's previous shards
+ // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
+ // will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
+ // However, due to the race condition mentioned above, we might still fall into case (a) first, and
+ // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
+ // watermark emitting still remains to be correct.
+
+ LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
+ " up watermarks; the new shards will be subscribed by this subtask after restore ...",
+ indexOfThisConsumerSubtask, newShardsDueToResharding.size());
+ throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
+ }
+
+ for (KinesisStreamShard shard : newShardsDueToResharding) {
+ // since there may be delay in discovering a new shard, all new shards due to
+ // resharding should be read starting from the earliest record possible
+ KinesisStreamShardState newShardState =
+ new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+ int newStateIndex = registerNewSubscribedShardState(newShardState);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
+ "the shard from sequence number {} with ShardConsumer {}",
+ indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
+ newShardState.getLastProcessedSequenceNum(), newStateIndex);
+ }
+
+ shardConsumersExecutor.submit(
+ new ShardConsumer<>(
+ this,
+ newStateIndex,
+ newShardState.getKinesisStreamShard(),
+ newShardState.getLastProcessedSequenceNum()));
+ }
+
+ // we also check if we are running here so that we won't start the discovery sleep
+ // interval if the running flag was set to false during the middle of the while loop
+ if (running && discoveryIntervalMillis != 0) {
+ try {
+ Thread.sleep(discoveryIntervalMillis);
+ } catch (InterruptedException iex) {
+ // the sleep may be interrupted by shutdownFetcher()
+ }
+ }
+ }
+
+ // make sure all resources have been terminated before leaving
+ awaitTermination();
+
+ // any error thrown in the shard consumer threads will be thrown to the main thread
+ Throwable throwable = this.error.get();
+ if (throwable != null) {
+ if (throwable instanceof Exception) {
+ throw (Exception) throwable;
+ } else if (throwable instanceof Error) {
+ throw (Error) throwable;
+ } else {
+ throw new Exception(throwable);
+ }
+ }
+ }
+
+ /**
+ * Creates a snapshot of the current last processed sequence numbers of each subscribed shard.
+ *
+ * @return state snapshot
+ */
+ public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+ // this method assumes that the checkpoint lock is held
+ assert Thread.holdsLock(checkpointLock);
+
+ HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
+ for (KinesisStreamShardState shardWithState : subscribedShardsState) {
+ stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
+ }
+ return stateSnapshot;
+ }
+
+ /**
+ * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete.
+ * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted.
+ */
+ public void shutdownFetcher() {
+ running = false;
+ mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
+ }
+ shardConsumersExecutor.shutdownNow();
+ }
+
+ /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
+ public void awaitTermination() throws InterruptedException {
+ while(!shardConsumersExecutor.isTerminated()) {
+ Thread.sleep(50);
+ }
+ }
+
+ /** Called by created threads to pass on errors. Only the first thrown error is set.
+ * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */
+ protected void stopWithError(Throwable throwable) {
+ if (this.error.compareAndSet(null, throwable)) {
+ shutdownFetcher();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Functions that update the subscribedStreamToLastDiscoveredShardIds state
+ // ------------------------------------------------------------------------
+
+ /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
+ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
+ String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
+
+ // the update is valid only if the given shard id is greater
+ // than the previous last seen shard id of the stream
+ if (lastSeenShardIdOfStream == null) {
+ // if not previously set, simply put as the last seen shard id
+ this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+ } else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+ this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+ }
+ }
+
+ /**
+ * A utility function that does the following:
+ *
+ * 1. Find new shards for each stream that we haven't seen before
+ * 2. For each new shard, determine whether this consumer subtask should subscribe to them;
+ * if yes, it is added to the returned list of shards
+ * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
+ * that we have already seen before the next time this function is called
+ */
+ private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
+
+ List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
+
+ GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
+ if (shardListResult.hasRetrievedShards()) {
+ Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
+
+ for (String stream : streamsWithNewShards) {
+ List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
+ for (KinesisStreamShard newShard : newShardsOfStream) {
+ if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
+ newShardsToSubscribe.add(newShard);
+ }
+ }
+
+ advanceLastDiscoveredShardOfStream(
+ stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
+ }
+ }
+
+ return newShardsToSubscribe;
+ }
+
+ // ------------------------------------------------------------------------
+ // Functions to get / set information about the consumer
+ // ------------------------------------------------------------------------
+
+ public void setIsRestoringFromFailure(boolean bool) {
+ this.isRestoredFromFailure = bool;
+ }
+
+ protected Properties getConsumerConfiguration() {
+ return configProps;
+ }
+
+ protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
+ try {
+ return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader());
+ } catch (IOException | ClassNotFoundException ex) {
+ // this really shouldn't happen; simply wrap it around a runtime exception
+ throw new RuntimeException(ex);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Thread-safe operations for record emitting and shard state updating
+ // that assure atomicity with respect to the checkpoint lock
+ // ------------------------------------------------------------------------
+
+ /**
+ * Atomic operation to collect a record and update state to the sequence number of the record.
+ * This method is called by {@link ShardConsumer}s.
+ *
+ * @param record the record to collect
+ * @param recordTimestamp timestamp to attach to the collected record
+ * @param shardStateIndex index of the shard to update in subscribedShardsState;
+ * this index should be the returned value from
+ * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
+ * when the shard state was registered.
+ * @param lastSequenceNumber the last sequence number value to update
+ */
+ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, recordTimestamp);
+ updateState(shardStateIndex, lastSequenceNumber);
+ }
+ }
+
+ /**
+ * Update the shard to last processed sequence number state.
+ * This method is called by {@link ShardConsumer}s.
+ *
+ * @param shardStateIndex index of the shard to update in subscribedShardsState;
+ * this index should be the returned value from
+ * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
+ * when the shard state was registered.
+ * @param lastSequenceNumber the last sequence number value to update
+ */
+ protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
+ synchronized (checkpointLock) {
+ subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
+
+ // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
+ // we've finished reading the shard and should determine it to be non-active
+ if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+ this.numberOfActiveShards.decrementAndGet();
+ LOG.info("Subtask {} has reached the end of subscribed shard: {}",
+ indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+ }
+ }
+ }
+
+ /**
+ * Register a new subscribed shard state.
+ *
+ * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
+ */
+ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
+ synchronized (checkpointLock) {
+ subscribedShardsState.add(newSubscribedShardState);
+
+ // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
+ // if the consumer had already finished reading a shard before we failed and restored), we determine that
+ // this subtask has a new active shard
+ if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+ this.numberOfActiveShards.incrementAndGet();
+ }
+
+ return subscribedShardsState.size()-1;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Miscellaneous utility functions
+ // ------------------------------------------------------------------------
+
+ /**
+ * Utility function to determine whether a shard should be subscribed by this consumer subtask.
+ *
+ * @param shard the shard to determine
+ * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
+ * @param indexOfThisConsumerSubtask index of this consumer subtask
+ */
+ private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+ int totalNumberOfConsumerSubtasks,
+ int indexOfThisConsumerSubtask) {
+ return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
+ }
+
+ private static ExecutorService createShardConsumersThreadPool(final String subtaskName) {
+ return Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ final AtomicLong threadCount = new AtomicLong(0);
+ Thread thread = new Thread(runnable);
+ thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+
+ /**
+ * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
+ * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
+ *
+ * @param streams the list of subscribed streams
+ * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
+ */
+ protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
+ HashMap<String, String> initial = new HashMap<>();
+ for (String stream : streams) {
+ initial.put(stream, null);
+ }
+ return initial;
+ }
+}