You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by skidder <gi...@git.apache.org> on 2017/01/06 18:21:07 UTC

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

GitHub user skidder opened a pull request:

    https://github.com/apache/flink/pull/3078

    [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

    My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like:
    
    ```
    com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
    	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
    	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
    	at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
    	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
    	at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
    	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
    	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
    	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    ```
    It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue.
    
    I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/skidder/flink skidder/flink-5355

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3078
    
----
commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d
Author: Scott Kidder <sc...@mux.com>
Date:   2016-12-16T16:46:54Z

    [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145771
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.proxy;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import com.amazonaws.AmazonServiceException;
    +import com.amazonaws.AmazonServiceException.ErrorType;
    +import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +
    +/**
    + * Test for methods in the KinesisProxy class.
    + * 
    --- End diff --
    
    Extra empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3078: [FLINK-5355] Handle AmazonKinesisException gracefully in ...

Posted by skidder <gi...@git.apache.org>.
Github user skidder commented on the issue:

    https://github.com/apache/flink/pull/3078
  
    Thanks @tzulitai , great feedback!
    
    The `ProvisionedThroughputExceededException` exception will be reported with an HTTP 400 response status code:
    http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_Errors
    
    The AWS SDK will assign the `ErrorType.Client` type to all exceptions with an HTTP status code less than 500 ([AWS SDK source](https://github.com/aws/aws-sdk-java/blob/7844c64cf248aed889811bf2e871ad6b276a89ca/aws-java-sdk-core/src/main/java/com/amazonaws/http/JsonErrorResponseHandler.java#L119-L121)).
    
    Perhaps we can perform exponential-backoff for exceptions where:
     * `Client` error of type `ProvisionedThroughputExceededException`
     * All `Server` errors (e.g. HTTP 500, 503)
     * All `Unknown` errors (appear to be limited to errors unmarshalling the Kinesis service response)
    
    All other exceptions can be thrown up.
    
    What are your thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145754
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
     		return getShardIteratorResult.getShardIterator();
     	}
     
    +	/**
    +	 * Determines whether the exception can be recovered from using
    +	 * exponential-backoff
    +	 * 
    +	 * @param ex
    +	 *            Exception to inspect
    +	 * @return <code>true</code> if the exception can be recovered from, else
    +	 *         <code>false</code>
    +	 */
    +	protected static boolean isRecoverableException(AmazonServiceException ex) {
    +		if (ex.getErrorType() == null) {
    +			return false;
    +		}
    +
    +		switch (ex.getErrorType()) {
    --- End diff --
    
    The indentation for the cases here seem to be missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3078: [FLINK-5355] Handle AmazonKinesisException gracefully in ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3078
  
    Thank you @skidder for addressing the comments! Merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3078


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145793
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.proxy;
    +
    +import static org.junit.Assert.*;
    --- End diff --
    
    In Flink we generally try to avoid asterisk imports.
    The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3078: [FLINK-5355] Handle AmazonKinesisException gracefully in ...

Posted by skidder <gi...@git.apache.org>.
Github user skidder commented on the issue:

    https://github.com/apache/flink/pull/3078
  
    Thank you @tzulitai for the feedback on the styling! I've pushed a commit that addresses your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145761
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
     		return getShardIteratorResult.getShardIterator();
     	}
     
    +	/**
    +	 * Determines whether the exception can be recovered from using
    +	 * exponential-backoff
    +	 * 
    +	 * @param ex
    +	 *            Exception to inspect
    +	 * @return <code>true</code> if the exception can be recovered from, else
    +	 *         <code>false</code>
    +	 */
    +	protected static boolean isRecoverableException(AmazonServiceException ex) {
    +		if (ex.getErrorType() == null) {
    +			return false;
    +		}
    +
    +		switch (ex.getErrorType()) {
    +		case Client:
    +			if (ex instanceof ProvisionedThroughputExceededException) {
    --- End diff --
    
    It'll probably be cleaner to just do `ex instanceof ProvisionedThroughputExceededException `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145735
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---
    @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
     		return getShardIteratorResult.getShardIterator();
     	}
     
    +	/**
    +	 * Determines whether the exception can be recovered from using
    +	 * exponential-backoff
    +	 * 
    +	 * @param ex
    +	 *            Exception to inspect
    --- End diff --
    
    nit: I think the Javadoc formatting here is inconsistent with the other methods (line change).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3078: [FLINK-5355] Handle AmazonKinesisException gracefully in ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3078
  
    Thank you for opening a pull request for this @skidder !
    
    I think `AmazonServiceException` will also be thrown for client errors (ex., wrong parameters for API calls). Whether an exception is caused by client or server is indicated by `AmazonServiceException#getErrorType()` which returns either `ErrorType.Client`, `ErrorType.Server`, or `ErrorType.Unknown`
    
    So, instead of simply catching a `AmazonServiceException`, I wonder if we should only perform exponential backoff for service exception with `Server` and `Unknown` error types. We need to also see what type `ProvisionedThroughputExceededException` is, and see if it is included as a `Server` or `Unknown`; if not, handle that also.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3078#discussion_r96145784
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.proxy;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import com.amazonaws.AmazonServiceException;
    +import com.amazonaws.AmazonServiceException.ErrorType;
    +import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +
    +/**
    + * Test for methods in the KinesisProxy class.
    --- End diff --
    
    Should link the `KinesisProxy` referencing, like other Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---