You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "mingleizhang (JIRA)" <ji...@apache.org> on 2017/04/19 07:13:41 UTC
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if
source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974207#comment-15974207 ]
mingleizhang edited comment on FLINK-6311 at 4/19/17 7:13 AM:
--------------------------------------------------------------
[~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does really, what kinda messages should we put it here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null.");
shardConsumersExecutor.shutdownNow();
}
{code}
was (Author: mingleizhang):
[~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it is really, what kinda message should we put it here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null.");
shardConsumersExecutor.shutdownNow();
}
{code}
> NPE in FlinkKinesisConsumer if source was closed before run
> -----------------------------------------------------------
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
> Issue Type: Bug
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: mingleizhang
>
> This was reported by an user: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected against the condition when the source was closed before it started running. Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)