You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kailashhd <gi...@git.apache.org> on 2018/05/11 14:08:41 UTC

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

GitHub user kailashhd opened a pull request:

    https://github.com/apache/flink/pull/5992

    [FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeSt…

    …ream for shard discovery as it offer higher rate limits
    
    ## What is the purpose of the change
    
    List Shards provides high AWS rate limits unlike DescribeStreams (which is on AWS account level) allowing faster shard discovery when kinesis data source in case streams are changed(re-sharded)
    
    ## Brief change log
     - Change the kinesis connector to use listShards instead of DescribeStream for shard discovery.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
     - Added a unit test to check the code path mocking out the kinesis depenedencies
     - Tested by running a small flink job with this connector.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / *no*)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*)
      - The serializers: (yes / *no* / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
      - The S3 file system connector: (yes / *no* / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / *no*)
      - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kailashhd/flink KinesisProxy

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5992
    
----
commit 3188f24b13c9009e977b6fb25da4d40c93fc811e
Author: Kailash HD <kd...@...>
Date:   2018-03-26T16:42:25Z

    [FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream for shard discovery as it offer higher rate limits

----


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188378324
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
    -	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    I think that this should be one unit of work.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377434
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() {
     	}
     
     	@Test
    -	public void testCustomConfigurationOverride() {
    -		Properties configProps = new Properties();
    -		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		KinesisProxy proxy = new KinesisProxy(configProps) {
    -			@Override
    -			protected AmazonKinesis createKinesisClient(Properties configProps) {
    -				ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
    -				clientConfig.setSocketTimeout(10000);
    -				return AWSUtil.createKinesisClient(configProps, clientConfig);
    +	public void testGetShardList() throws Exception {
    +		List<String> shardIds =
    +				Arrays.asList(
    +						"shardId-000000000000",
    +						"shardId-000000000001",
    +						"shardId-000000000002",
    +						"shardId-000000000003");
    +		shardIdSet = new HashSet<>(shardIds);
    +		shards =
    +				shardIds
    +						.stream()
    +						.map(shardId -> new Shard().withShardId(shardId))
    +						.collect(Collectors.toList());
    +		Properties kinesisConsumerConfig = new Properties();
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey");
    +		kinesisConsumerConfig.setProperty(
    +				ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey");
    +		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
    +		AmazonKinesis mockClient = mock(AmazonKinesis.class);
    +		Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient);
    +
    +		ListShardsResult responseWithMoreData =
    +				new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
    +		ListShardsResult responseFinal =
    +				new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
    +		doReturn(responseWithMoreData)
    +				.when(mockClient)
    +				.listShards(argThat(initialListShardsRequestMatcher()));
    +		doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
    +		HashMap<String, String> streamHashMap =
    +				createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
    +		GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap);
    +
    +		Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
    +
    +		Set<String> expectedStreams = new HashSet<>();
    +		expectedStreams.add(fakeStreamName);
    +		Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams);
    +		List<StreamShardHandle> actualShardList =
    +				shardListResult.getRetrievedShardListOfStream(fakeStreamName);
    +		List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
    +		System.out.println(actualShardList.toString());
    +		assertThat(actualShardList, hasSize(4));
    +		for (int i = 0; i < 4; i++) {
    +			StreamShardHandle shardHandle =
    +					new StreamShardHandle(
    +							fakeStreamName,
    +							new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
    +			expectedStreamShard.add(shardHandle);
    +		}
    +
    +		Assert.assertThat(
    +				actualShardList,
    +				containsInAnyOrder(
    +						expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
    +	}
    +
    +	private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
    +		private final String shardId;
    +		private final String nextToken;
    +
    +		ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) {
    +			shardId = shardIdArg;
    +			nextToken = nextTokenArg;
    +		}
    +
    +		@Override
    +		protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
    +			if (shardId == null) {
    +				if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
    --- End diff --
    
    Done. Also note that there is quite a bit of code from reused from here: https://goo.gl/MGSRb3. Do I need to add reference to that code? 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377415
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
     		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
     
    -		DescribeStreamResult describeStreamResult;
    +		// List Shards returns just the first 1000 shard entries. In order to read the entire stream,
    +		// we need to use the returned nextToken to get additional shards.
    +		ListShardsResult listShardsResult;
    +		String startShardToken = null;
     		do {
    -			describeStreamResult = describeStream(streamName, lastSeenShardId);
    -
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +			listShardsResult = listShards(streamName, lastSeenShardId, startShardToken);
    --- End diff --
    
    Thanks for catching this. This is something which I have fixed by clearing shardsOfStream to ensure we return an empty shardsOfStream in case of ExpiredTokenException.
    I intended the following behavior for this: In case there is an unlikely case of expired next token, then we will just return an empty ShardsOfStream. This should be alright since in case there are no new shards discovered, by default it ends up returning an empty shardsOfStream. 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r190140597
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -26,20 +29,60 @@
     import com.amazonaws.ClientConfigurationFactory;
     import com.amazonaws.services.kinesis.AmazonKinesis;
     import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ListShardsRequest;
    +import com.amazonaws.services.kinesis.model.ListShardsResult;
     import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import org.hamcrest.Description;
    +import org.hamcrest.TypeSafeDiagnosingMatcher;
    +import org.junit.Assert;
     import org.junit.Test;
     import org.powermock.reflect.Whitebox;
     
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.stream.Collectors;
     
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
    +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
     import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertFalse;
     import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
     
     /**
      * Test for methods in the {@link KinesisProxy} class.
      */
     public class KinesisProxyTest {
    +	private static final String NEXT_TOKEN = "NextToken";
    +	private static final String fakeStreamName = "fake-stream";
    +	private Set<String> shardIdSet;
    +	private List<Shard> shards;
    --- End diff --
    
    Should we move these to be scoped only to the `testGetShardList ` test method?


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377376
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    -		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    -		describeStreamRequest.setStreamName(streamName);
    -		describeStreamRequest.setExclusiveStartShardId(startShardId);
    +	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
    +																			@Nullable String startNextToken)
    +			throws InterruptedException {
    +		final ListShardsRequest listShardsRequest = new ListShardsRequest();
    +		if (startNextToken == null) {
    +			listShardsRequest.setExclusiveStartShardId(startShardId);
    +			listShardsRequest.setStreamName(streamName);
    +		} else {
    +			// Note the nextToken returned by AWS expires within 300 sec.
    +			listShardsRequest.setNextToken(startNextToken);
    +		}
     
    -		DescribeStreamResult describeStreamResult = null;
    +		ListShardsResult listShardsResults = null;
     
    -		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
     		int attemptCount = 0;
    -		while (describeStreamResult == null) { // retry until we get a result
    +		// List Shards returns just the first 1000 shard entries. Make sure that all entries
    +		// are taken up.
    +		while (listShardsResults == null) { // retry until we get a result
     			try {
    -				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
    +				listShardsResults = kinesisClient.listShards(listShardsRequest);
     			} catch (LimitExceededException le) {
     				long backoffMillis = fullJitterBackoff(
    -					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
    -					+ backoffMillis + " millis.");
    +						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
    +					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
    +									+ ". Backing off for " + backoffMillis + " millis.");
     				Thread.sleep(backoffMillis);
    -			} catch (ResourceNotFoundException re) {
    -				throw new RuntimeException("Error while getting stream details", re);
    -			}
    -		}
    -
    -		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    -		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
    -			if (LOG.isWarnEnabled()) {
    -				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
    -					"describeStream operation will not contain any shard information.");
    +			} catch (ResourceInUseException reInUse) {
    +				if (LOG.isWarnEnabled()) {
    +					// List Shards will throw an exception if stream in not in active state. Will return
    +					LOG.warn("The stream is currently not in active state. Reusing the older state "
    +							+ "for the time being");
    +					break;
    +				}
    +			} catch (ResourceNotFoundException reNotFound) {
    +				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
    +			} catch (InvalidArgumentException inArg) {
    +				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
    +			} catch (ExpiredNextTokenException expiredToken) {
    +				LOG.warn("List Shards has an expired token. Reusing the previous state.");
    +				break;
     			}
     		}
    -
    -		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    -		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    -		if (startShardId != null) {
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
    --- End diff --
    
    Done.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188481562
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
    -	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    I have deprecated some old properties and added the new ones. This is my first time using the @deprecated annotations. Do let me know if there are better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for these changes. 
    
    



---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187632455
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
     	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    I have retained the properties string values as same but changed the variable names to indicate listShards for readability. 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188378986
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -97,8 +228,7 @@ public void testClientConfigOverride() {
     
     		AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
     		ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
    -			"clientConfiguration");
    +						"clientConfiguration");
    --- End diff --
    
    remove unnecessary change


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188481796
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---
    @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
     		return configProps;
     	}
     
    +	public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
    +		if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) {
    --- End diff --
    
    Minor: this could be generalized by iterating over a map of oldkey -> newkey and I would also suggest to log a warning for deprecated keys


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377393
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    -		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    -		describeStreamRequest.setStreamName(streamName);
    -		describeStreamRequest.setExclusiveStartShardId(startShardId);
    +	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
    +																			@Nullable String startNextToken)
    +			throws InterruptedException {
    +		final ListShardsRequest listShardsRequest = new ListShardsRequest();
    +		if (startNextToken == null) {
    +			listShardsRequest.setExclusiveStartShardId(startShardId);
    +			listShardsRequest.setStreamName(streamName);
    +		} else {
    +			// Note the nextToken returned by AWS expires within 300 sec.
    +			listShardsRequest.setNextToken(startNextToken);
    +		}
     
    -		DescribeStreamResult describeStreamResult = null;
    +		ListShardsResult listShardsResults = null;
     
    -		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
     		int attemptCount = 0;
    -		while (describeStreamResult == null) { // retry until we get a result
    +		// List Shards returns just the first 1000 shard entries. Make sure that all entries
    +		// are taken up.
    +		while (listShardsResults == null) { // retry until we get a result
     			try {
    -				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
    +				listShardsResults = kinesisClient.listShards(listShardsRequest);
     			} catch (LimitExceededException le) {
     				long backoffMillis = fullJitterBackoff(
    -					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
    -					+ backoffMillis + " millis.");
    +						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
    +					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
    +									+ ". Backing off for " + backoffMillis + " millis.");
     				Thread.sleep(backoffMillis);
    -			} catch (ResourceNotFoundException re) {
    -				throw new RuntimeException("Error while getting stream details", re);
    -			}
    -		}
    -
    -		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    -		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
    -			if (LOG.isWarnEnabled()) {
    -				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
    -					"describeStream operation will not contain any shard information.");
    +			} catch (ResourceInUseException reInUse) {
    +				if (LOG.isWarnEnabled()) {
    +					// List Shards will throw an exception if stream in not in active state. Will return
    +					LOG.warn("The stream is currently not in active state. Reusing the older state "
    +							+ "for the time being");
    +					break;
    +				}
    +			} catch (ResourceNotFoundException reNotFound) {
    +				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
    +			} catch (InvalidArgumentException inArg) {
    +				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
    +			} catch (ExpiredNextTokenException expiredToken) {
    +				LOG.warn("List Shards has an expired token. Reusing the previous state.");
    +				break;
     			}
     		}
    -
    -		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    -		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    -		if (startShardId != null) {
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
    +		// the exclusive start shard id in the returned shards list; check if we need to remove
    +		// these erroneously returned shards.
    +		if (startShardId != null && listShardsResults != null) {
    +			List<Shard> shards = listShardsResults.getShards();
     			Iterator<Shard> shardItr = shards.iterator();
    -			while (shardItr.hasNext()) {
    +			while (shardItr.hasNext()){
    --- End diff --
    
    Done. 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r189417750
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---
    @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
     		return configProps;
     	}
     
    +	public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
    +		if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) {
    --- End diff --
    
    Done.


---

[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the issue:

    https://github.com/apache/flink/pull/5992
  
    Looks like we can either leave all the application facing constants as they are (which might look a bit strange given they refer to old API and we are now using the new), or we deprecate and duplicate :) If the latter, then I would suggest to map from old/deprecated to new property keys in a single place and issue deprecation warning.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788278
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    -		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    -		describeStreamRequest.setStreamName(streamName);
    -		describeStreamRequest.setExclusiveStartShardId(startShardId);
    +	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
    +																			@Nullable String startNextToken)
    +			throws InterruptedException {
    +		final ListShardsRequest listShardsRequest = new ListShardsRequest();
    +		if (startNextToken == null) {
    +			listShardsRequest.setExclusiveStartShardId(startShardId);
    +			listShardsRequest.setStreamName(streamName);
    +		} else {
    +			// Note the nextToken returned by AWS expires within 300 sec.
    +			listShardsRequest.setNextToken(startNextToken);
    +		}
     
    -		DescribeStreamResult describeStreamResult = null;
    +		ListShardsResult listShardsResults = null;
     
    -		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
     		int attemptCount = 0;
    -		while (describeStreamResult == null) { // retry until we get a result
    +		// List Shards returns just the first 1000 shard entries. Make sure that all entries
    +		// are taken up.
    +		while (listShardsResults == null) { // retry until we get a result
     			try {
    -				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
    +				listShardsResults = kinesisClient.listShards(listShardsRequest);
     			} catch (LimitExceededException le) {
     				long backoffMillis = fullJitterBackoff(
    -					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
    -					+ backoffMillis + " millis.");
    +						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
    +					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
    +									+ ". Backing off for " + backoffMillis + " millis.");
     				Thread.sleep(backoffMillis);
    -			} catch (ResourceNotFoundException re) {
    -				throw new RuntimeException("Error while getting stream details", re);
    -			}
    -		}
    -
    -		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    -		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
    -			if (LOG.isWarnEnabled()) {
    -				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
    -					"describeStream operation will not contain any shard information.");
    +			} catch (ResourceInUseException reInUse) {
    +				if (LOG.isWarnEnabled()) {
    +					// List Shards will throw an exception if stream in not in active state. Will return
    +					LOG.warn("The stream is currently not in active state. Reusing the older state "
    +							+ "for the time being");
    +					break;
    +				}
    +			} catch (ResourceNotFoundException reNotFound) {
    +				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
    +			} catch (InvalidArgumentException inArg) {
    +				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
    +			} catch (ExpiredNextTokenException expiredToken) {
    +				LOG.warn("List Shards has an expired token. Reusing the previous state.");
    +				break;
     			}
     		}
    -
    -		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    -		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    -		if (startShardId != null) {
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
    --- End diff --
    
    incorrect indentation


---

[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on the issue:

    https://github.com/apache/flink/pull/5992
  
    Bumping this up in case it got lost in the queue. @tzulitai 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788223
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
    -	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    This goes the same for all other key variable rename changes in this class.
    I would suggest to deprecate existing ones if we want to change the name internally.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788213
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
    -	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    Changing the name of this variable, strictly speaking, breaks backwards compatibility, as users might be using them.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188481158
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -97,8 +228,7 @@ public void testClientConfigOverride() {
     
     		AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
     		ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
    -			"clientConfiguration");
    +						"clientConfiguration");
     		assertEquals(9999, clientConfiguration.getSocketTimeout());
     	}
    -
    --- End diff --
    
    Done. Sorry for the inconvenience.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788338
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
     		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
     
    -		DescribeStreamResult describeStreamResult;
    +		// List Shards returns just the first 1000 shard entries. In order to read the entire stream,
    +		// we need to use the returned nextToken to get additional shards.
    +		ListShardsResult listShardsResult;
    +		String startShardToken = null;
     		do {
    -			describeStreamResult = describeStream(streamName, lastSeenShardId);
    -
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +			listShardsResult = listShards(streamName, lastSeenShardId, startShardToken);
    --- End diff --
    
    if within `listShards(...)` we caught the `ExpiredNextTokenException`, then `null` will be returned as the result, correct?
    If so, then the current built up `shardsIfStream` will be returned immediately, regardless of whether or not there are more shards following.
    
    Although it might not be too common that we have expired tokens here, I wonder if we can handle this case more gracefully (e.g., re-fetching a token to make sure that there really is no more shards).


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r191033585
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -26,20 +29,60 @@
     import com.amazonaws.ClientConfigurationFactory;
     import com.amazonaws.services.kinesis.AmazonKinesis;
     import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ListShardsRequest;
    +import com.amazonaws.services.kinesis.model.ListShardsResult;
     import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import org.hamcrest.Description;
    +import org.hamcrest.TypeSafeDiagnosingMatcher;
    +import org.junit.Assert;
     import org.junit.Test;
     import org.powermock.reflect.Whitebox;
     
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.stream.Collectors;
     
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
    +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
     import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertFalse;
     import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
     
     /**
      * Test for methods in the {@link KinesisProxy} class.
      */
     public class KinesisProxyTest {
    +	private static final String NEXT_TOKEN = "NextToken";
    +	private static final String fakeStreamName = "fake-stream";
    +	private Set<String> shardIdSet;
    +	private List<Shard> shards;
    --- End diff --
    
    Done


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5992


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788363
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() {
     	}
     
     	@Test
    -	public void testCustomConfigurationOverride() {
    -		Properties configProps = new Properties();
    -		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		KinesisProxy proxy = new KinesisProxy(configProps) {
    -			@Override
    -			protected AmazonKinesis createKinesisClient(Properties configProps) {
    -				ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
    -				clientConfig.setSocketTimeout(10000);
    -				return AWSUtil.createKinesisClient(configProps, clientConfig);
    +	public void testGetShardList() throws Exception {
    +		List<String> shardIds =
    +				Arrays.asList(
    +						"shardId-000000000000",
    +						"shardId-000000000001",
    +						"shardId-000000000002",
    +						"shardId-000000000003");
    +		shardIdSet = new HashSet<>(shardIds);
    +		shards =
    +				shardIds
    +						.stream()
    +						.map(shardId -> new Shard().withShardId(shardId))
    +						.collect(Collectors.toList());
    +		Properties kinesisConsumerConfig = new Properties();
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey");
    +		kinesisConsumerConfig.setProperty(
    +				ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey");
    +		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
    +		AmazonKinesis mockClient = mock(AmazonKinesis.class);
    +		Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient);
    +
    +		ListShardsResult responseWithMoreData =
    +				new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
    +		ListShardsResult responseFinal =
    +				new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
    +		doReturn(responseWithMoreData)
    +				.when(mockClient)
    +				.listShards(argThat(initialListShardsRequestMatcher()));
    +		doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
    +		HashMap<String, String> streamHashMap =
    +				createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
    +		GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap);
    +
    +		Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
    +
    +		Set<String> expectedStreams = new HashSet<>();
    +		expectedStreams.add(fakeStreamName);
    +		Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams);
    +		List<StreamShardHandle> actualShardList =
    +				shardListResult.getRetrievedShardListOfStream(fakeStreamName);
    +		List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
    +		System.out.println(actualShardList.toString());
    +		assertThat(actualShardList, hasSize(4));
    +		for (int i = 0; i < 4; i++) {
    +			StreamShardHandle shardHandle =
    +					new StreamShardHandle(
    +							fakeStreamName,
    +							new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
    +			expectedStreamShard.add(shardHandle);
    +		}
    +
    +		Assert.assertThat(
    +				actualShardList,
    +				containsInAnyOrder(
    +						expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
    +	}
    +
    +	private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
    +		private final String shardId;
    +		private final String nextToken;
    +
    +		ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) {
    +			shardId = shardIdArg;
    +			nextToken = nextTokenArg;
    +		}
    +
    +		@Override
    +		protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
    +			if (shardId == null) {
    +				if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
    --- End diff --
    
    Can we avoid using `StringUtils` and just use `!String.isEmpty()` instead?


---

[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on the issue:

    https://github.com/apache/flink/pull/5992
  
    R: @tzulitai @tweise 



---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377363
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
    -	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    For the time being, I will keep the older names and make the changes to deprecate the property names in the followup PR. I am not sure what the policy is with respect to changes size. I feel breaking this up to 2 different PRs will make it easier to review. Let me know if you feel otherwise. I will pull in those changes into this PR then. 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788284
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
     	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    -		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    -		describeStreamRequest.setStreamName(streamName);
    -		describeStreamRequest.setExclusiveStartShardId(startShardId);
    +	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
    +																			@Nullable String startNextToken)
    +			throws InterruptedException {
    +		final ListShardsRequest listShardsRequest = new ListShardsRequest();
    +		if (startNextToken == null) {
    +			listShardsRequest.setExclusiveStartShardId(startShardId);
    +			listShardsRequest.setStreamName(streamName);
    +		} else {
    +			// Note the nextToken returned by AWS expires within 300 sec.
    +			listShardsRequest.setNextToken(startNextToken);
    +		}
     
    -		DescribeStreamResult describeStreamResult = null;
    +		ListShardsResult listShardsResults = null;
     
    -		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
     		int attemptCount = 0;
    -		while (describeStreamResult == null) { // retry until we get a result
    +		// List Shards returns just the first 1000 shard entries. Make sure that all entries
    +		// are taken up.
    +		while (listShardsResults == null) { // retry until we get a result
     			try {
    -				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
    +				listShardsResults = kinesisClient.listShards(listShardsRequest);
     			} catch (LimitExceededException le) {
     				long backoffMillis = fullJitterBackoff(
    -					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
    -					+ backoffMillis + " millis.");
    +						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
    +					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
    +									+ ". Backing off for " + backoffMillis + " millis.");
     				Thread.sleep(backoffMillis);
    -			} catch (ResourceNotFoundException re) {
    -				throw new RuntimeException("Error while getting stream details", re);
    -			}
    -		}
    -
    -		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    -		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
    -			if (LOG.isWarnEnabled()) {
    -				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
    -					"describeStream operation will not contain any shard information.");
    +			} catch (ResourceInUseException reInUse) {
    +				if (LOG.isWarnEnabled()) {
    +					// List Shards will throw an exception if stream in not in active state. Will return
    +					LOG.warn("The stream is currently not in active state. Reusing the older state "
    +							+ "for the time being");
    +					break;
    +				}
    +			} catch (ResourceNotFoundException reNotFound) {
    +				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
    +			} catch (InvalidArgumentException inArg) {
    +				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
    +			} catch (ExpiredNextTokenException expiredToken) {
    +				LOG.warn("List Shards has an expired token. Reusing the previous state.");
    +				break;
     			}
     		}
    -
    -		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    -		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    -		if (startShardId != null) {
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
    +		// the exclusive start shard id in the returned shards list; check if we need to remove
    +		// these erroneously returned shards.
    +		if (startShardId != null && listShardsResults != null) {
    +			List<Shard> shards = listShardsResults.getShards();
     			Iterator<Shard> shardItr = shards.iterator();
    -			while (shardItr.hasNext()) {
    +			while (shardItr.hasNext()){
    --- End diff --
    
    code style nit: missing space before `{`


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187630051
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
     	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    Would it be better to include a new set of constants for listShards? Semantically this is equivalent, so having the same names for listShards should be okay but a little bit confusing. 


---

[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on the issue:

    https://github.com/apache/flink/pull/5992
  
    Please note I rebased my changes to the master as I was having some failures when building just the flink-connector-kinesis maven project. I hope this will not cause problems with reviewing the code changes. Sorry for the inconvenience if it does. :( 


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r190140724
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -26,20 +29,60 @@
     import com.amazonaws.ClientConfigurationFactory;
     import com.amazonaws.services.kinesis.AmazonKinesis;
     import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ListShardsRequest;
    +import com.amazonaws.services.kinesis.model.ListShardsResult;
     import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import org.hamcrest.Description;
    +import org.hamcrest.TypeSafeDiagnosingMatcher;
    +import org.junit.Assert;
     import org.junit.Test;
     import org.powermock.reflect.Whitebox;
     
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.stream.Collectors;
     
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
    +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
     import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertFalse;
     import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
     
     /**
      * Test for methods in the {@link KinesisProxy} class.
      */
     public class KinesisProxyTest {
    +	private static final String NEXT_TOKEN = "NextToken";
    +	private static final String fakeStreamName = "fake-stream";
    +	private Set<String> shardIdSet;
    +	private List<Shard> shards;
    +
    +	protected static HashMap<String, String>
    +	createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
    +		HashMap<String, String> initial = new HashMap<>();
    +		for (String stream : streams) {
    +			initial.put(stream, null);
    +		}
    +		return initial;
    +	}
    +
    +	private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
    +		return new ListShardsRequestMatcher(null, null);
    +	}
    +
    +	private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
    --- End diff --
    
    nit: IMO, it would help with readability if we move these private utility methods after the main test ones.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188379050
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -97,8 +228,7 @@ public void testClientConfigOverride() {
     
     		AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
     		ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
    -			"clientConfiguration");
    +						"clientConfiguration");
     		assertEquals(9999, clientConfiguration.getSocketTimeout());
     	}
    -
    --- End diff --
    
    remove unnecessary change


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r191033584
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -26,20 +29,60 @@
     import com.amazonaws.ClientConfigurationFactory;
     import com.amazonaws.services.kinesis.AmazonKinesis;
     import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ListShardsRequest;
    +import com.amazonaws.services.kinesis.model.ListShardsResult;
     import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import org.hamcrest.Description;
    +import org.hamcrest.TypeSafeDiagnosingMatcher;
    +import org.junit.Assert;
     import org.junit.Test;
     import org.powermock.reflect.Whitebox;
     
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.stream.Collectors;
     
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
    +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
     import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertFalse;
     import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
     
     /**
      * Test for methods in the {@link KinesisProxy} class.
      */
     public class KinesisProxyTest {
    +	private static final String NEXT_TOKEN = "NextToken";
    +	private static final String fakeStreamName = "fake-stream";
    +	private Set<String> shardIdSet;
    +	private List<Shard> shards;
    +
    +	protected static HashMap<String, String>
    +	createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
    +		HashMap<String, String> initial = new HashMap<>();
    +		for (String stream : streams) {
    +			initial.put(stream, null);
    +		}
    +		return initial;
    +	}
    +
    +	private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
    +		return new ListShardsRequestMatcher(null, null);
    +	}
    +
    +	private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
    --- End diff --
    
    Done


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187627524
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---
    @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
     
     	/** The base backoff time between each describeStream attempt. */
    -	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
    --- End diff --
    
    This would be a breaking change. We should leave these properties as is, if they are semantically equivalent.


---

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

Posted by kailashhd <gi...@git.apache.org>.
Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188481152
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -97,8 +228,7 @@ public void testClientConfigOverride() {
     
     		AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
     		ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
    -			"clientConfiguration");
    +						"clientConfiguration");
    --- End diff --
    
    Done. Sorry for the inconvenience.


---