You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Oleg Zhurakousky (JIRA)" <ji...@apache.org> on 2016/07/28 20:15:20 UTC

[jira] [Commented] (NIFI-2322) ConsumeKafka gets stuck (cannot be stopped)

    [ https://issues.apache.org/jira/browse/NIFI-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15398135#comment-15398135 ] 

Oleg Zhurakousky commented on NIFI-2322:
----------------------------------------

At this point there seem to be nothing I can do other then performing a custom check if connection is possible to the specified server:
{code}
Socket client = new Socket();
client.connect(new InetSocketAddress("host", 9092), 10000);
{code}
Tried every consumer property related to timeout
{code}
session.timeout.ms
 request.timeout.ms
 fetch.max.wait.ms
 reconnect.backoff.ms
 retry.backoff.ms
{code}
. . . with no joy. KafkaConsumer blocks on poll(timeout) even when such timeout is set to 0. Yet Kafka javadoc states:
{code}
/**
     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
     * subscribed to any topics or partitions before polling for data.
     * <p>
     * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
     * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
     * offset for the subscribed list of partitions
     *
     *
     * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
     *            immediately with any records that are available now. Must not be negative.
     * . . . .
     */
    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
. . .
{code}


> ConsumeKafka gets stuck (cannot be stopped)
> -------------------------------------------
>
>                 Key: NIFI-2322
>                 URL: https://issues.apache.org/jira/browse/NIFI-2322
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.0.0
>            Reporter: Haimo Liu
>             Fix For: 1.0.0
>
>         Attachments: Screen Shot 2016-07-19 at 2.14.32 PM.png, Screen Shot 2016-07-19 at 2.14.48 PM.png
>
>
> If kafka broker path is invalid or inaccurate, ConsumeKafka processor can get stuck (concurrent tasks cannot be stopped in a clustered mode). Please refer to the images in the attachment. It appears that a configurable timeout property would potentially solve the problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)