You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:17 UTC
[27/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
new file mode 100644
index 0000000..612a4a7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
+ */
+public class ShardConsumer<T> implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
+ private final KinesisDeserializationSchema<T> deserializer;
+
+ private final KinesisProxyInterface kinesis;
+
+ private final int subscribedShardStateIndex;
+
+ private final KinesisDataFetcher<T> fetcherRef;
+
+ private final KinesisStreamShard subscribedShard;
+
+ private final int maxNumberOfRecordsPerFetch;
+ private final long fetchIntervalMillis;
+
+ private SequenceNumber lastSequenceNum;
+
+ /**
+ * Creates a shard consumer.
+ *
+ * @param fetcherRef reference to the owning fetcher
+ * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+ * @param subscribedShard the shard this consumer is subscribed to
+ * @param lastSequenceNum the sequence number in the shard to start consuming
+ */
+ public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+ Integer subscribedShardStateIndex,
+ KinesisStreamShard subscribedShard,
+ SequenceNumber lastSequenceNum) {
+ this(fetcherRef,
+ subscribedShardStateIndex,
+ subscribedShard,
+ lastSequenceNum,
+ KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+ }
+
+ /** This constructor is exposed for testing purposes */
+ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+ Integer subscribedShardStateIndex,
+ KinesisStreamShard subscribedShard,
+ SequenceNumber lastSequenceNum,
+ KinesisProxyInterface kinesis) {
+ this.fetcherRef = checkNotNull(fetcherRef);
+ this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
+ this.subscribedShard = checkNotNull(subscribedShard);
+ this.lastSequenceNum = checkNotNull(lastSequenceNum);
+ checkArgument(
+ !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
+ "Should not start a ShardConsumer if the shard has already been completely read.");
+
+ this.deserializer = fetcherRef.getClonedDeserializationSchema();
+
+ Properties consumerConfig = fetcherRef.getConsumerConfiguration();
+ this.kinesis = kinesis;
+ this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+ Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+ this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ String nextShardItr;
+
+ try {
+ // before infinitely looping, we set the initial nextShardItr appropriately
+
+ if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+ // if the shard is already closed, there will be no latest next record to get for this shard
+ if (subscribedShard.isClosed()) {
+ nextShardItr = null;
+ } else {
+ nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
+ }
+ } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
+ nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
+ } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+ nextShardItr = null;
+ } else {
+ // we will be starting from an actual sequence number (due to restore from failure).
+ // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
+ // from the last aggregated record; otherwise, we can simply start iterating from the record right after.
+
+ if (lastSequenceNum.isAggregated()) {
+ String itrForLastAggregatedRecord =
+ kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+ // get only the last aggregated record
+ GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
+
+ List<UserRecord> fetchedRecords = deaggregateRecords(
+ getRecordsResult.getRecords(),
+ subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+ subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+ long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
+ for (UserRecord record : fetchedRecords) {
+ // we have found a dangling sub-record if it has a larger subsequence number
+ // than our last sequence number; if so, collect the record and update state
+ if (record.getSubSequenceNumber() > lastSubSequenceNum) {
+ deserializeRecordForCollectionAndUpdateState(record);
+ }
+ }
+
+ // set the nextShardItr so we can continue iterating in the next while loop
+ nextShardItr = getRecordsResult.getNextShardIterator();
+ } else {
+ // the last record was non-aggregated, so we can simply start from the next record
+ nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+ }
+ }
+
+ while(isRunning()) {
+ if (nextShardItr == null) {
+ fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
+
+ // we can close this consumer thread once we've reached the end of the subscribed shard
+ break;
+ } else {
+ if (fetchIntervalMillis != 0) {
+ Thread.sleep(fetchIntervalMillis);
+ }
+
+ GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+
+ // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
+ List<UserRecord> fetchedRecords = deaggregateRecords(
+ getRecordsResult.getRecords(),
+ subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+ subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+ for (UserRecord record : fetchedRecords) {
+ deserializeRecordForCollectionAndUpdateState(record);
+ }
+
+ nextShardItr = getRecordsResult.getNextShardIterator();
+ }
+ }
+ } catch (Throwable t) {
+ fetcherRef.stopWithError(t);
+ }
+ }
+
+ /**
+ * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
+ * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
+ * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
+ * interrupt all currently running {@link ShardConsumer}s.
+ */
+ private boolean isRunning() {
+ return !Thread.interrupted();
+ }
+
+ /**
+ * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
+ * successfully collected sequence number in this shard consumer is also updated so that
+ * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
+ * iterators if necessary.
+ *
+ * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
+ * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
+ *
+ * @param record record to deserialize and collect
+ * @throws IOException
+ */
+ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
+ throws IOException {
+ ByteBuffer recordData = record.getData();
+
+ byte[] dataBytes = new byte[recordData.remaining()];
+ recordData.get(dataBytes);
+
+ final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
+
+ final T value = deserializer.deserialize(
+ dataBytes,
+ record.getPartitionKey(),
+ record.getSequenceNumber(),
+ approxArrivalTimestamp,
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId());
+
+ SequenceNumber collectedSequenceNumber = (record.isAggregated())
+ ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
+ : new SequenceNumber(record.getSequenceNumber());
+
+ fetcherRef.emitRecordAndUpdateState(
+ value,
+ approxArrivalTimestamp,
+ subscribedShardStateIndex,
+ collectedSequenceNumber);
+
+ lastSequenceNum = collectedSequenceNumber;
+ }
+
+ /**
+ * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+ * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+ * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+ * be used for the next call to this method.
+ *
+ * Note: it is important that this method is not called again before all the records from the last result have been
+ * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+ * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+ * incorrect shard iteration if the iterator had to be refreshed.
+ *
+ * @param shardItr shard iterator to use
+ * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+ * @return get records result
+ * @throws InterruptedException
+ */
+ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
+ GetRecordsResult getRecordsResult = null;
+ while (getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
+ } catch (ExpiredIteratorException eiEx) {
+ LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+ " refreshing the iterator ...", shardItr, subscribedShard);
+ shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+ // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+ if (fetchIntervalMillis != 0) {
+ Thread.sleep(fetchIntervalMillis);
+ }
+ }
+ }
+ return getRecordsResult;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
+ return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
new file mode 100644
index 0000000..53ed11b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
+
+ private static final long serialVersionUID = -6004217801761077536L;
+
+ private final String streamName;
+ private final Shard shard;
+
+ private final int cachedHash;
+
+ /**
+ * Create a new KinesisStreamShard
+ *
+ * @param streamName
+ * the name of the Kinesis stream that this shard belongs to
+ * @param shard
+ * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard
+ */
+ public KinesisStreamShard(String streamName, Shard shard) {
+ this.streamName = checkNotNull(streamName);
+ this.shard = checkNotNull(shard);
+
+ // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+ // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+ int hash = 17;
+ hash = 37 * hash + streamName.hashCode();
+ hash = 37 * hash + shard.getShardId().hashCode();
+ this.cachedHash = hash;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public boolean isClosed() {
+ return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+ }
+
+ public Shard getShard() {
+ return shard;
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisStreamShard{" +
+ "streamName='" + streamName + "'" +
+ ", shard='" + shard.toString() + "'}";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KinesisStreamShard)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ KinesisStreamShard other = (KinesisStreamShard) obj;
+
+ return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+
+ /**
+ * Utility function to compare two shard ids
+ *
+ * @param firstShardId first shard id to compare
+ * @param secondShardId second shard id to compare
+ * @return a value less than 0 if the first shard id is smaller than the second shard id,
+ * or a value larger than 0 the first shard is larger then the second shard id,
+ * or 0 if they are equal
+ */
+ public static int compareShardIds(String firstShardId, String secondShardId) {
+ if (!isValidShardId(firstShardId)) {
+ throw new IllegalArgumentException("The first shard id has invalid format.");
+ }
+
+ if (!isValidShardId(secondShardId)) {
+ throw new IllegalArgumentException("The second shard id has invalid format.");
+ }
+
+ // digit segment of the shard id starts at index 8
+ return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+ }
+
+ /**
+ * Checks if a shard id has valid format.
+ * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+ * prefixed with "shardId-", ex. "shardId-000000000015".
+ *
+ * @param shardId the shard id to check
+ * @return whether the shard id is valid
+ */
+ public static boolean isValidShardId(String shardId) {
+ if (shardId == null) { return false; }
+ return shardId.matches("^shardId-\\d{12}");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
new file mode 100644
index 0000000..00181da
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+/**
+ * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ */
+public class KinesisStreamShardState {
+
+ private KinesisStreamShard kinesisStreamShard;
+ private SequenceNumber lastProcessedSequenceNum;
+
+ public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+ this.kinesisStreamShard = kinesisStreamShard;
+ this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+ }
+
+ public KinesisStreamShard getKinesisStreamShard() {
+ return this.kinesisStreamShard;
+ }
+
+ public SequenceNumber getLastProcessedSequenceNum() {
+ return this.lastProcessedSequenceNum;
+ }
+
+ public void setLastProcessedSequenceNum(SequenceNumber update) {
+ this.lastProcessedSequenceNum = update;
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisStreamShardState{" +
+ "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+ ", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KinesisStreamShardState)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ KinesisStreamShardState other = (KinesisStreamShardState) obj;
+
+ return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+ }
+
+ @Override
+ public int hashCode() {
+ return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
new file mode 100644
index 0000000..8182201
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+
+/**
+ * Special flag values for sequence numbers in shards to indicate special positions.
+ * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created.
+ * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis.
+ */
+public enum SentinelSequenceNumber {
+
+ /** Flag value for shard's sequence numbers to indicate that the
+ * shard should start to be read from the latest incoming records */
+ SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
+
+ /** Flag value for shard's sequence numbers to indicate that the shard should
+ * start to be read from the earliest records that haven't expired yet */
+ SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+
+ /** Flag value to indicate that we have already read the last record of this shard
+ * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
+ SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+
+ private SequenceNumber sentinel;
+
+ SentinelSequenceNumber(SequenceNumber sentinel) {
+ this.sentinel = sentinel;
+ }
+
+ public SequenceNumber get() {
+ return sentinel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
new file mode 100644
index 0000000..021f53f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number,
+ * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the
+ * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation.
+ */
+public class SequenceNumber implements Serializable {
+
+ private static final long serialVersionUID = 876972197938972667L;
+
+ private static final String DELIMITER = "-";
+
+ private final String sequenceNumber;
+ private final long subSequenceNumber;
+
+ private final int cachedHash;
+
+ /**
+ * Create a new instance for a non-aggregated Kinesis record without a subsequence number.
+ * @param sequenceNumber the sequence number
+ */
+ public SequenceNumber(String sequenceNumber) {
+ this(sequenceNumber, -1);
+ }
+
+ /**
+ * Create a new instance, with the specified sequence number and subsequence number.
+ * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1.
+ * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record.
+ *
+ * @param sequenceNumber the sequence number
+ * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records)
+ */
+ public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
+ this.sequenceNumber = checkNotNull(sequenceNumber);
+ this.subSequenceNumber = subSequenceNumber;
+
+ this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode());
+ }
+
+ public boolean isAggregated() {
+ return subSequenceNumber >= 0;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long getSubSequenceNumber() {
+ return subSequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ if (isAggregated()) {
+ return sequenceNumber + DELIMITER + subSequenceNumber;
+ } else {
+ return sequenceNumber;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SequenceNumber)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ SequenceNumber other = (SequenceNumber) obj;
+
+ return sequenceNumber.equals(other.getSequenceNumber())
+ && (subSequenceNumber == other.getSubSequenceNumber());
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
new file mode 100644
index 0000000..04b1654
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call.
+ */
+public class GetShardListResult {
+
+ private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+
+ public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+ if (!streamsToRetrievedShardList.containsKey(stream)) {
+ streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+ }
+ streamsToRetrievedShardList.get(stream).add(retrievedShard);
+ }
+
+ public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+ if (retrievedShards.size() != 0) {
+ if (!streamsToRetrievedShardList.containsKey(stream)) {
+ streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+ }
+ streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
+ }
+ }
+
+ public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+ if (!streamsToRetrievedShardList.containsKey(stream)) {
+ return null;
+ } else {
+ return streamsToRetrievedShardList.get(stream);
+ }
+ }
+
+ public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+ if (!streamsToRetrievedShardList.containsKey(stream)) {
+ return null;
+ } else {
+ return streamsToRetrievedShardList.get(stream).getLast();
+ }
+ }
+
+ public boolean hasRetrievedShards() {
+ return !streamsToRetrievedShardList.isEmpty();
+ }
+
+ public Set<String> getStreamsWithRetrievedShards() {
+ return streamsToRetrievedShardList.keySet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
new file mode 100644
index 0000000..9ffc8e6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Kinesis proxy implementation - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several functions, such as getting a list of shards and
+ * fetching a batch of data records starting from a specified record sequence number.
+ *
+ * NOTE:
+ * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
+ * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
+ * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
+ */
+public class KinesisProxy implements KinesisProxyInterface {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
+
+ /** The actual Kinesis client from the AWS SDK that we will be using to make calls */
+ private final AmazonKinesisClient kinesisClient;
+
+ /** Random seed used to calculate backoff jitter for Kinesis operations */
+ private final static Random seed = new Random();
+
+ // ------------------------------------------------------------------------
+ // describeStream() related performance settings
+ // ------------------------------------------------------------------------
+
+ /** Base backoff millis for the describe stream operation */
+ private final long describeStreamBaseBackoffMillis;
+
+ /** Maximum backoff millis for the describe stream operation */
+ private final long describeStreamMaxBackoffMillis;
+
+ /** Exponential backoff power constant for the describe stream operation */
+ private final double describeStreamExpConstant;
+
+ // ------------------------------------------------------------------------
+ // getRecords() related performance settings
+ // ------------------------------------------------------------------------
+
+ /** Base backoff millis for the get records operation */
+ private final long getRecordsBaseBackoffMillis;
+
+ /** Maximum backoff millis for the get records operation */
+ private final long getRecordsMaxBackoffMillis;
+
+ /** Exponential backoff power constant for the get records operation */
+ private final double getRecordsExpConstant;
+
+ /** Maximum attempts for the get records operation */
+ private final int getRecordsMaxAttempts;
+
+ // ------------------------------------------------------------------------
+ // getShardIterator() related performance settings
+ // ------------------------------------------------------------------------
+
+ /** Base backoff millis for the get shard iterator operation */
+ private final long getShardIteratorBaseBackoffMillis;
+
+ /** Maximum backoff millis for the get shard iterator operation */
+ private final long getShardIteratorMaxBackoffMillis;
+
+ /** Exponential backoff power constant for the get shard iterator operation */
+ private final double getShardIteratorExpConstant;
+
+ /** Maximum attempts for the get shard iterator operation */
+ private final int getShardIteratorMaxAttempts;
+
+ /**
+ * Create a new KinesisProxy based on the supplied configuration properties
+ *
+ * @param configProps configuration properties containing AWS credential and AWS region info
+ */
+ private KinesisProxy(Properties configProps) {
+ checkNotNull(configProps);
+
+ this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+
+ this.describeStreamBaseBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+ Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
+ this.describeStreamMaxBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+ Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
+ this.describeStreamExpConstant = Double.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+ Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+ this.getRecordsBaseBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
+ this.getRecordsMaxBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
+ this.getRecordsExpConstant = Double.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+ Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+ this.getRecordsMaxAttempts = Integer.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
+
+ this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
+ this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
+ this.getShardIteratorExpConstant = Double.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+ Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
+ this.getShardIteratorMaxAttempts = Integer.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
+
+ }
+
+ /**
+ * Creates a Kinesis proxy.
+ *
+ * @param configProps configuration properties
+ * @return the created kinesis proxy
+ */
+ public static KinesisProxyInterface create(Properties configProps) {
+ return new KinesisProxy(configProps);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
+ final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+ getRecordsRequest.setShardIterator(shardIterator);
+ getRecordsRequest.setLimit(maxRecordsToGet);
+
+ GetRecordsResult getRecordsResult = null;
+
+ int attempt = 0;
+ while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+ } catch (ProvisionedThroughputExceededException ex) {
+ long backoffMillis = fullJitterBackoff(
+ getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+ LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ + backoffMillis + " millis.");
+ Thread.sleep(backoffMillis);
+ }
+ }
+
+ if (getRecordsResult == null) {
+ throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
+ " retry attempts returned ProvisionedThroughputExceededException.");
+ }
+
+ return getRecordsResult;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+ GetShardListResult result = new GetShardListResult();
+
+ for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
+ String stream = streamNameWithLastSeenShardId.getKey();
+ String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
+ result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
+ GetShardIteratorResult getShardIteratorResult = null;
+
+ int attempt = 0;
+ while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
+ try {
+ getShardIteratorResult =
+ kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+ } catch (ProvisionedThroughputExceededException ex) {
+ long backoffMillis = fullJitterBackoff(
+ getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+ LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ + backoffMillis + " millis.");
+ Thread.sleep(backoffMillis);
+ }
+ }
+
+ if (getShardIteratorResult == null) {
+ throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
+ " retry attempts returned ProvisionedThroughputExceededException.");
+ }
+ return getShardIteratorResult.getShardIterator();
+ }
+
+ private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
+ List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+
+ DescribeStreamResult describeStreamResult;
+ do {
+ describeStreamResult = describeStream(streamName, lastSeenShardId);
+
+ List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+ for (Shard shard : shards) {
+ shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+ }
+
+ if (shards.size() != 0) {
+ lastSeenShardId = shards.get(shards.size() - 1).getShardId();
+ }
+ } while (describeStreamResult.getStreamDescription().isHasMoreShards());
+
+ return shardsOfStream;
+ }
+
+ /**
+ * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
+ *
+ * This method is using a "full jitter" approach described in AWS's article,
+ * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
+ * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
+ * jitter backoff approach will help distribute calls across the fetchers over time.
+ *
+ * @param streamName the stream to describe
+ * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
+ * @return the result of the describe stream operation
+ */
+ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
+ final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+ describeStreamRequest.setStreamName(streamName);
+ describeStreamRequest.setExclusiveStartShardId(startShardId);
+
+ DescribeStreamResult describeStreamResult = null;
+
+ // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+ int attemptCount = 0;
+ while (describeStreamResult == null) { // retry until we get a result
+ try {
+ describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+ } catch (LimitExceededException le) {
+ long backoffMillis = fullJitterBackoff(
+ describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
+ LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
+ + backoffMillis + " millis.");
+ Thread.sleep(backoffMillis);
+ } catch (ResourceNotFoundException re) {
+ throw new RuntimeException("Error while getting stream details", re);
+ }
+ }
+
+ String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
+ if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
+ "describeStream operation will not contain any shard information.");
+ }
+ }
+
+ // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
+ // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
+ if (startShardId != null) {
+ List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+ Iterator<Shard> shardItr = shards.iterator();
+ while (shardItr.hasNext()) {
+ if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+ shardItr.remove();
+ }
+ }
+ }
+
+ return describeStreamResult;
+ }
+
+ private static long fullJitterBackoff(long base, long max, double power, int attempt) {
+ long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
+ return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
new file mode 100644
index 0000000..39ddc52
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+/**
+ * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region.
+ */
+public interface KinesisProxyInterface {
+
+ /**
+ * Get a shard iterator from the specified position in a shard.
+ * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}}
+ * to read data from the Kinesis shard.
+ *
+ * @param shard the shard to get the iterator
+ * @param shardIteratorType the iterator type, defining how the shard is to be iterated
+ * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+ * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
+ * @return shard iterator which can be used to read data from Kinesis
+ * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+ * operation has exceeded the rate limit; this exception will be thrown
+ * if the backoff is interrupted.
+ */
+ String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
+
+ /**
+ * Get the next batch of data records using a specific shard iterator
+ *
+ * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
+ * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
+ * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch
+ * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+ * operation has exceeded the rate limit; this exception will be thrown
+ * if the backoff is interrupted.
+ */
+ GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
+
+ /**
+ * Get shard list of multiple Kinesis streams, ignoring the
+ * shards of each stream before a specified last seen shard id.
+ *
+ * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value
+ * @return result of the shard list query
+ * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+ * operation has exceeded the rate limit; this exception will be thrown
+ * if the backoff is interrupted.
+ */
+ GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
new file mode 100644
index 0000000..0effdd8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
+ * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
+ * about the record that may be useful to the user application.
+ *
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Deserializes a Kinesis record's bytes
+ *
+ * @param recordValue the record's value as a byte array
+ * @param partitionKey the record's partition key at the time of writing
+ * @param seqNum the sequence number of this record in the Kinesis shard
+ * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
+ * @param stream the name of the Kinesis stream that this record was sent to
+ * @param shardId The identifier of the shard the record was sent to
+ * @return the deserialized message as an Java object
+ * @throws IOException
+ */
+ T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted.
+ *
+ * @param nextElement the element to test for the end-of-stream signal
+ * @return true if the element signals end of stream, false otherwise
+ */
+ // TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..6e66038
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {
+ private static final long serialVersionUID = 9143148962928375886L;
+
+ private final DeserializationSchema<T> deserializationSchema;
+
+ public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)
+ throws IOException {
+ return deserializationSchema.deserialize(recordValue);
+ }
+
+ /*
+ FLINK-4194
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return deserializationSchema.isEndOfStream(nextElement);
+ } */
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
new file mode 100644
index 0000000..03dd72c
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Kinesis-specific serialization schema, allowing users to specify a target stream based
+ * on a record's contents.
+ * @param <T>
+ */
+public interface KinesisSerializationSchema<T> extends Serializable {
+ /**
+ * Serialize the given element into a ByteBuffer
+ *
+ * @param element The element to serialize
+ * @return Serialized representation of the element
+ */
+ ByteBuffer serialize(T element);
+
+ /**
+ * Optional method to determine the target stream based on the element.
+ * Return <code>null</code> to use the default stream
+ *
+ * @param element The element to determine the target stream from
+ * @return target stream name
+ */
+ String getTargetStream(T element);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
new file mode 100644
index 0000000..cff69e5
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import java.util.Properties;
+
+/**
+ * Some utilities specific to Amazon Web Service.
+ */
+public class AWSUtil {
+
+ /**
+ * Creates an Amazon Kinesis Client.
+ * @param configProps configuration properties containing the access key, secret key, and region
+ * @return a new Amazon Kinesis Client
+ */
+ public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+ // set a Flink-specific user agent
+ ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
+ awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
+ " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+
+ // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
+ AmazonKinesisClient client = new AmazonKinesisClient(
+ AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
+
+ client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
+ if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+ client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ }
+ return client;
+ }
+
+ /**
+ * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+ *
+ * @param configProps the configuration properties
+ * @return The corresponding AWS Credentials Provider instance
+ */
+ public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+ CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
+ AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString()));
+
+ AWSCredentialsProvider credentialsProvider;
+
+ switch (credentialProviderType) {
+ case ENV_VAR:
+ credentialsProvider = new EnvironmentVariableCredentialsProvider();
+ break;
+ case SYS_PROP:
+ credentialsProvider = new SystemPropertiesCredentialsProvider();
+ break;
+ case PROFILE:
+ String profileName = configProps.getProperty(
+ AWSConfigConstants.AWS_PROFILE_NAME, null);
+ String profileConfigPath = configProps.getProperty(
+ AWSConfigConstants.AWS_PROFILE_PATH, null);
+ credentialsProvider = (profileConfigPath == null)
+ ? new ProfileCredentialsProvider(profileName)
+ : new ProfileCredentialsProvider(profileConfigPath, profileName);
+ break;
+ case AUTO:
+ credentialsProvider = new DefaultAWSCredentialsProviderChain();
+ break;
+ default:
+ case BASIC:
+ credentialsProvider = new AWSCredentialsProvider() {
+ @Override
+ public AWSCredentials getCredentials() {
+ return new BasicAWSCredentials(
+ configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
+ configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+ }
+
+ @Override
+ public void refresh() {
+ // do nothing
+ }
+ };
+ }
+
+ return credentialsProvider;
+ }
+
+ /**
+ * Checks whether or not a region ID is valid
+ *
+ * @param region The AWS region ID to check
+ * @return true if the supplied region ID is valid, false otherwise
+ */
+ public static boolean isValidRegion(String region) {
+ try {
+ Regions.fromName(region.toLowerCase());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
new file mode 100644
index 0000000..9aa14ad
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for Flink Kinesis connector configuration.
+ */
+public class KinesisConfigUtil {
+
+ /**
+ * Validate configuration properties for {@link FlinkKinesisConsumer}.
+ */
+ public static void validateConsumerConfiguration(Properties config) {
+ checkNotNull(config, "config can not be null");
+
+ validateAwsConfiguration(config);
+
+ if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
+ String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
+
+ // specified initial position in stream must be either LATEST or TRIM_HORIZON
+ try {
+ InitialPosition.valueOf(initPosType);
+ } catch (IllegalArgumentException e) {
+ StringBuilder sb = new StringBuilder();
+ for (InitialPosition pos : InitialPosition.values()) {
+ sb.append(pos.toString()).append(", ");
+ }
+ throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
+ }
+ }
+
+ validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+ "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
+
+ validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+ "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+ "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+ "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+ "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+ "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+ "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+ "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+ "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+ "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+ "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+ "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+ "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+ "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+ if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+ checkArgument(
+ Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+ < ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+ "Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " +
+ ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+ );
+ }
+ }
+
+ /**
+ * Validate configuration properties for {@link FlinkKinesisProducer}.
+ */
+ public static void validateProducerConfiguration(Properties config) {
+ checkNotNull(config, "config can not be null");
+
+ validateAwsConfiguration(config);
+
+ validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT,
+ "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value.");
+
+ validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT,
+ "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value.");
+ }
+
+ /**
+ * Validate configuration properties related to Amazon AWS service
+ */
+ public static void validateAwsConfiguration(Properties config) {
+ if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+ // if the credential provider type is not specified, it will default to BASIC later on,
+ // so the Access Key ID and Secret Key must be given
+ if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+ || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+ throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+ "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+ }
+ } else {
+ String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+
+ // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
+ CredentialProvider providerType;
+ try {
+ providerType = CredentialProvider.valueOf(credentialsProviderType);
+ } catch (IllegalArgumentException e) {
+ StringBuilder sb = new StringBuilder();
+ for (CredentialProvider type : CredentialProvider.values()) {
+ sb.append(type.toString()).append(", ");
+ }
+ throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
+ }
+
+ // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
+ if (providerType == CredentialProvider.BASIC) {
+ if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+ || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+ throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+ "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+ }
+ }
+ }
+
+ if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
+ throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+ } else {
+ // specified AWS Region name must be recognizable
+ if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
+ StringBuilder sb = new StringBuilder();
+ for (Regions region : Regions.values()) {
+ sb.append(region.getName()).append(", ");
+ }
+ throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
+ }
+ }
+ }
+
+ private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
+ if (config.containsKey(key)) {
+ try {
+ long value = Long.parseLong(config.getProperty(key));
+ if (value < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
+
+ private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) {
+ if (config.containsKey(key)) {
+ try {
+ int value = Integer.parseInt(config.getProperty(key));
+ if (value < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
+
+ private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) {
+ if (config.containsKey(key)) {
+ try {
+ double value = Double.parseDouble(config.getProperty(key));
+ if (value < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
new file mode 100644
index 0000000..773f932
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger