You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ning Zhang (Jira)" <ji...@apache.org> on 2020/08/07 02:06:00 UTC

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

     [ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ning Zhang updated KAFKA-10370:
-------------------------------
    Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask).

In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask).

In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:
```
java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
```

As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10370
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>    Affects Versions: 2.5.0
>            Reporter: Ning Zhang
>            Assignee: Ning Zhang
>            Priority: Major
>             Fix For: 2.6.0
>
>
> In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask).
> In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer.
> when running (2), we saw the following IllegalStateException:
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition mytopic-1
>     at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>     at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
> {code}
> As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)