You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Srikrishna Alla <al...@gmail.com> on 2017/01/23 19:47:49 UTC

Kafka Connect Consumer throwing ILLEGAL_GENERATION when committing offsets and going into re-balance state

Hi,

I am running a Kafka Sink Connector with Kafka 0.9.0.2. I am seeing that my
consumer is periodically throwing an error when saving offsets and then
going into group rebalance state. Please let me know what can be done to
fix this issue

2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 DEBUG NetworkClient : 619 - Sending metadata request
ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=74,client_id=consumer-3},
body={topics=[monitorAlert]}), isInitiatedByNetworkClient,
createdTimeMs=1484954541923, sendTimeMs=0) to node 1001
2017-01-20 17:22:21 DEBUG Selector : 307 - Connection with clpd355.sldc./
135.41.3.85 disconnected
java.io.EOFException
        at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
        at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:907)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
        at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:21 DEBUG NetworkClient : 454 - Node 2147482646
disconnected.
2017-01-20 17:22:21 DEBUG ConsumerNetworkClient : 376 - Cancelled
OFFSET_COMMIT request ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@6428047c,
request=RequestSend(header={api_key=8,api_version=2,correlation_id=73,client_id=consumer-3},
body={group_id=connect-jdbc-sink
-connector,group_generation_id=1,member_id=consumer-3-5a029818-f6be-4fde-b7e0-1bf751ff9722,retention_time=-1,topics=[{topic=monitorAlert,partitions=[{partition=0,offset=12755,metadata=},{partition=1,offset=13113,metadata=},{partition=2,offset=12832,metadata=}]}]}),
createdTimeMs=1484954541923, sendTimeMs=0) with correlation id 73 due to
node 2147482646 being dis
connected
2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
coordinator 2147482646 dead.
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.common.errors.DisconnectException
2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
group coordinator is not available.
2017-01-20 17:22:21 DEBUG AbstractCoordinator : 471 - Issuing group
metadata request to broker 1001
2017-01-20 17:22:21 DEBUG Metadata : 172 - Updated cluster metadata version
6 to Cluster(nodes = [Node(1001, clpd355.sldc.sbc.com, 6667)], partitions =
[Partition(topic = monitorAlert, partition = 2, leader = 1001, replicas =
[1001,], isr = [1001,], Partition(topic = monitorAlert, partition = 0,
leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic =
 monitorAlert, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,]])
2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
response ClientResponse(receivedTimeMs=1484954541926, disconnected=false,
request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4940d8ef,
request=RequestSend(header={api_key=10,api_version=0,correlati
on_id=75,client_id=consumer-3},
body={group_id=connect-jdbc-sink-connector}), createdTimeMs=1484954541925,
sendTimeMs=1484954541925),
responseBody={error_code=0,coordinator={node_id=1001,host=
clpd355.sldc.sbc.com,port=6667}})
2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
coordinator 2147482646 dead.

I see some group metadata request calls after this and then I see the
ILLEGAL_GENERATION error

2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
response ClientResponse(receivedTimeMs=1484954541975, disconnected=false,
request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@53c5eee0,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=109,client_id=consumer-3},
body={group_id=connect-jdbc-sink-connector}), createdTimeMs=1484954541974,
sendTimeMs=1484954541974),
responseBody={error_code=0,coordinator={node_id=1001,host=
clpd355.sldc.sbc.com,port=6667}})
2017-01-20 17:22:21 DEBUG NetworkClient : 487 - Initiating connection to
node 2147482646 at clpd355.sldc.sbc.com:6667.
2017-01-20 17:22:21 DEBUG SaslClientAuthenticator : 103 - Creating
SaslClient: client=cdap/clpd355@COREITLTSTL10DEV01
;service=kafka;serviceHostname=clpd355.sld.com;mechs=[GSSAPI]
2017-01-20 17:22:21 DEBUG NetworkClient : 467 - Completed connection to
node 2147482646
2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.internals.SendFailedException
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:907)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
        at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12756, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 241 - Revoking previously
assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:967)
        at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:207)
        at
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:376)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:244)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:208)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:305)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:889)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
        at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 96 - Got callback for timed out
commit Thread[WorkerSinkTask-jdbc-sink-connector-0,5,main]: -1, but most
recent commit is 47
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
(JOIN_GROUP:
{group_id=connect-jdbc-sink-connector,session_timeout=300000,member_id=consumer-3-5a029818-f6be-4fde-b7e0-1bf751ff9722,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}) to coordinator 2147482646
2017-01-20 17:22:25 INFO  AbstractCoordinator : 360 - Attempt to join group
connect-jdbc-sink-connector failed due to unknown member id, resetting and
retrying.
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
(JOIN_GROUP:
{group_id=connect-jdbc-sink-connector,session_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}) to coordinator 2147482646
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 342 - Joined group:
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,members=[{member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 219 - Performing range
assignment for subscriptions
{consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@76f0d523
}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 223 - Finished assignment:
{consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@3d3929d6
}
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 403 - Issuing leader
SyncGroup (SYNC_GROUP:
{group_id=connect-jdbc-sink-connector,generation_id=1,member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,group_assignment=[{member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_assignment=java.nio.HeapByteBuffer[pos=0
lim=40 cap=40]}]}) to coordinator 2147482646
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 429 - Received successful
sync group response for group connect-jdbc-sink-connector:
{error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=40
cap=40]}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 185 - Setting newly
assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 575 - Fetching committed
offsets for partitions: [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-2 to the committed offset 12768
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-2 with offset 12768
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-0 to the committed offset 12690
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-0 with offset 12690
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-1 to the committed offset 13048
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-1 with offset 13048
2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
12768 for partition monitorAlert-2
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
13048 for partition monitorAlert-1
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
12690 for partition monitorAlert-0
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 104 - Finished
WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2 ms

Then it tries to commit offsets multiple times. Finally, it throws this
error -
017-01-20 17:25:40 INFO  WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:25:40 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
12768 for partition monitorAlert-2
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
13048 for partition monitorAlert-1
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
12690 for partition monitorAlert-0
2017-01-20 17:25:40 DEBUG WorkerSinkTask : 104 - Finished
WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2 ms
2017-01-20 17:25:40 DEBUG AbstractCoordinator : 621 - Received successful
heartbeat response.
2017-01-20 17:25:41 DEBUG session : 347 - Scavenging sessions at
1484954741657
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-2 since its offset 12832 does not match the expected
offset 12768
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-1 since its offset 13113 does not match the expected
offset 13048
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-0 since its offset 12756 does not match the expected
offset 12690



Here is the configuration in my connect-distributed.properties-
bootstrap.servers=clpd355:6667
group.id=alert
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
config.storage.topic=connect-configs
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
producer.sasl.kerberos.service.name=kafka
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=GSSAPI
consumer.fetch.message.max.bytes =1048576
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
consumer.max.partition.fetch.bytes=1048576
consumer.heartbeat.interval.ms= 60000
consumer.fetch.max.wait.ms=200000

Thanks,
Sri

Re: Kafka Connect Consumer throwing ILLEGAL_GENERATION when committing offsets and going into re-balance state

Posted by Srikrishna Alla <al...@gmail.com>.
I am seeing this in the server logs. Looks like the GroupCoordinator has
the connect customer group in a constant state of rebalance. Can someone
please check this and let me know what is going wrong here.

[2017-01-20 16:45:33,187] INFO [GroupCoordinator 1001]: Preparing to
restabilize group alert with old generation 1 (kafka.coordinator.
GroupCoordinator)
[2017-01-20 16:45:53,335] INFO [GroupCoordinator 1001]: Stabilized group
alert generation 2 (kafka.coordinator.GroupCoordinator)
[2017-01-20 16:45:53,344] INFO [GroupCoordinator 1001]: Assignment received
from leader for group alert for generation 2 (kafka.coordinator.
GroupCoordinator)
[2017-01-20 16:45:53,710] INFO [GroupCoordinator 1001]: Preparing to
restabilize group connect-jdbc-sink-connector with old generation 0
(kafka.coordinator.GroupCoordinator)
[2017-01-20 16:45:53,710] INFO [GroupCoordinator 1001]: Stabilized group
connect-jdbc-sink-connector generation 1 (kafka.coordinator.
GroupCoordinator)
[2017-01-20 16:45:53,713] INFO [GroupCoordinator 1001]: Assignment received
from leader for group connect-jdbc-sink-connector for generation 1
(kafka.coordinator.GroupCoordinator)
[2017-01-20 16:50:53,715] INFO [GroupCoordinator 1001]: Preparing to
restabilize group connect-jdbc-sink-connector with old generation 1
(kafka.coordinator.GroupCoordinator)
[2017-01-20 16:50:53,717] INFO [GroupCoordinator 1001]: Group
connect-jdbc-sink-connector generation 1 is dead and removed
(kafka.coordinator.GroupCoordinator)

I am also getting the group coordinator is dead when checking for the group
offsets -
[sa9726@clpd355 bin]$ ./kafka-consumer-groups.sh  --group
connect-jdbc-sink-connector --new-consumer --describe --bootstrap-server
clpd355:6667
Error while executing consumer group command Request GROUP_COORDINATOR
failed on brokers List(Node(-1, clpd355, 6667))
java.lang.RuntimeException: Request GROUP_COORDINATOR failed on brokers
List(Node(-1, clpd355, 6667))
    at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:73)
    at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:78)
    at kafka.admin.AdminClient.describeGroup(AdminClient.scala:130)
    at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
    at
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:326)
    at
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:96)
    at
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:314)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:75)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

On Tue, Jan 24, 2017 at 7:27 PM, Srikrishna Alla <al...@gmail.com>
wrote:

> Could someone please let me know what is going wrong in my Kafka cluster?
> I would highly appreciate a response.
>
> Thanks,
> Sri
>
> On Mon, Jan 23, 2017 at 1:47 PM, Srikrishna Alla <
> allasrikrishna1@gmail.com> wrote:
>
>> Hi,
>>
>> I am running a Kafka Sink Connector with Kafka 0.9.0.2. I am seeing that
>> my consumer is periodically throwing an error when saving offsets and then
>> going into group rebalance state. Please let me know what can be done to
>> fix this issue
>>
>> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
>> 2017-01-20 17:22:21 DEBUG NetworkClient : 619 - Sending metadata request
>> ClientRequest(expectResponse=true, callback=null,
>> request=RequestSend(header={api_key=3,api_version=0,correlat
>> ion_id=74,client_id=consumer-3}, body={topics=[monitorAlert]}),
>> isInitiatedByNetworkClient, createdTimeMs=1484954541923, sendTimeMs=0) to
>> node 1001
>> 2017-01-20 17:22:21 DEBUG Selector : 307 - Connection with clpd355.sldc./
>> 135.41.3.85 disconnected
>> java.io.EOFException
>>         at org.apache.kafka.common.network.NetworkReceive.readFromReada
>> bleChannel(NetworkReceive.java:83)
>>         at org.apache.kafka.common.network.NetworkReceive.readFrom(
>> NetworkReceive.java:71)
>>         at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
>> annel.java:153)
>>         at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
>> el.java:134)
>>         at org.apache.kafka.common.network.Selector.poll(Selector.java:
>> 286)
>>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>> java:256)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.clientPoll(ConsumerNetworkClient.java:320)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:213)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:193)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:907)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
>> nsumer.java:852)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerS
>> inkTask.java:171)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iterat
>> ion(WorkerSinkTaskThread.java:90)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execut
>> e(WorkerSinkTaskThread.java:58)
>>         at org.apache.kafka.connect.util.ShutdownableThread.run(Shutdow
>> nableThread.java:82)
>> 2017-01-20 17:22:21 DEBUG NetworkClient : 454 - Node 2147482646
>> <(214)%20748-2646> disconnected.
>> 2017-01-20 17:22:21 DEBUG ConsumerNetworkClient : 376 - Cancelled
>> OFFSET_COMMIT request ClientRequest(expectResponse=true,
>> callback=org.apache.kafka.clients.consumer.internals.Consume
>> rNetworkClient$RequestFutureCompletionHandler@6428047c,
>> request=RequestSend(header={api_key=8,api_version=2,correlat
>> ion_id=73,client_id=consumer-3}, body={group_id=connect-jdbc-sink
>> -connector,group_generation_id=1,member_id=consumer-3-5a0298
>> 18-f6be-4fde-b7e0-1bf751ff9722,retention_time=-1,topics=[{
>> topic=monitorAlert,partitions=[{partition=0,offset=12755,
>> metadata=},{partition=1,offset=13113,metadata=},{
>> partition=2,offset=12832,metadata=}]}]}), createdTimeMs=1484954541923,
>> sendTimeMs=0) with correlation id 73 due to node 2147482646
>> <(214)%20748-2646> being dis
>> connected
>> 2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
>> coordinator 2147482646 <(214)%20748-2646> dead.
>> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
>> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
>> exception:
>> org.apache.kafka.common.errors.DisconnectException
>> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
>> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
>> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
>> exception:
>> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException:
>> The group coordinator is not available.
>> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 471 - Issuing group
>> metadata request to broker 1001
>> 2017-01-20 17:22:21 DEBUG Metadata : 172 - Updated cluster metadata
>> version 6 to Cluster(nodes = [Node(1001, clpd355.sldc.sbc.com, 6667)],
>> partitions = [Partition(topic = monitorAlert, partition = 2, leader = 1001,
>> replicas = [1001,], isr = [1001,], Partition(topic = monitorAlert,
>> partition = 0, leader = 1001, replicas = [1001,], isr = [1001,],
>> Partition(topic =
>>  monitorAlert, partition = 1, leader = 1001, replicas = [1001,], isr =
>> [1001,]])
>> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
>> response ClientResponse(receivedTimeMs=1484954541926,
>> disconnected=false, request=ClientRequest(expectResponse=true,
>> callback=org.apache.kafka.clients.consumer.internals.Consume
>> rNetworkClient$RequestFutureCompletionHandler@4940d8ef,
>> request=RequestSend(header={api_key=10,api_version=0,correlati
>> on_id=75,client_id=consumer-3}, body={group_id=connect-jdbc-sink-connector}),
>> createdTimeMs=1484954541925, sendTimeMs=1484954541925),
>> responseBody={error_code=0,coordinator={node_id=1001,host=cl
>> pd355.sldc.sbc.com,port=6667}})
>> 2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
>> coordinator 2147482646 <(214)%20748-2646> dead.
>>
>> I see some group metadata request calls after this and then I see the
>> ILLEGAL_GENERATION error
>>
>> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
>> response ClientResponse(receivedTimeMs=1484954541975,
>> disconnected=false, request=ClientRequest(expectResponse=true,
>> callback=org.apache.kafka.clients.consumer.internals.Consume
>> rNetworkClient$RequestFutureCompletionHandler@53c5eee0,
>> request=RequestSend(header={api_key=10,api_version=0,correla
>> tion_id=109,client_id=consumer-3}, body={group_id=connect-jdbc-sink-connector}),
>> createdTimeMs=1484954541974, sendTimeMs=1484954541974),
>> responseBody={error_code=0,coordinator={node_id=1001,host=cl
>> pd355.sldc.sbc.com,port=6667}})
>> 2017-01-20 17:22:21 DEBUG NetworkClient : 487 - Initiating connection to
>> node 2147482646 <(214)%20748-2646> at clpd355.sldc.sbc.com:6667.
>> 2017-01-20 17:22:21 DEBUG SaslClientAuthenticator : 103 - Creating
>> SaslClient: client=cdap/clpd355@COREITLTSTL10DEV01;service=kafka;
>> serviceHostname=clpd355.sld.com;mechs=[GSSAPI]
>> 2017-01-20 17:22:21 DEBUG NetworkClient : 467 - Completed connection to
>> node 2147482646 <(214)%20748-2646>
>> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
>> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
>> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
>> exception:
>> org.apache.kafka.clients.consumer.internals.SendFailedException
>> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
>> ILLEGAL_GENERATION occurred while committing offsets for group
>> connect-jdbc-sink-connector
>> 2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
>> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
>> exception:
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed due to group rebalance
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
>> onSuccess(RequestFuture.java:167)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> fireSuccess(RequestFuture.java:133)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> complete(RequestFuture.java:107)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient$RequestFutureCompletionHandler.onComplete(ConsumerNetw
>> orkClient.java:380)
>>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>> java:274)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.clientPoll(ConsumerNetworkClient.java:320)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:213)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:193)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:907)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
>> nsumer.java:852)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerS
>> inkTask.java:171)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iterat
>> ion(WorkerSinkTaskThread.java:90)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execut
>> e(WorkerSinkTaskThread.java:58)
>>         at org.apache.kafka.connect.util.ShutdownableThread.run(Shutdow
>> nableThread.java:82)
>> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12756, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 241 - Revoking previously
>> assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
>> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
>> ILLEGAL_GENERATION occurred while committing offsets for group
>> connect-jdbc-sink-connector
>> 2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
>> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
>> exception:
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed due to group rebalance
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
>> onSuccess(RequestFuture.java:167)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> fireSuccess(RequestFuture.java:133)
>>         at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> complete(RequestFuture.java:107)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient$RequestFutureCompletionHandler.onComplete(ConsumerNetw
>> orkClient.java:380)
>>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>> java:274)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.clientPoll(ConsumerNetworkClient.java:320)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:213)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:193)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:163)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.commitOffsetsSync(ConsumerCoordinator.java:352)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>> KafkaConsumer.java:967)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffset
>> s(WorkerSinkTask.java:207)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebala
>> nce.onPartitionsRevoked(WorkerSinkTask.java:376)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.onJoinPrepare(ConsumerCoordinator.java:244)
>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.ensureActiveGroup(AbstractCoordinator.java:208)
>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.ensurePartitionAssignment(ConsumerCoordinator.java:305)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:889)
>>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
>> nsumer.java:852)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerS
>> inkTask.java:171)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iterat
>> ion(WorkerSinkTaskThread.java:90)
>>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execut
>> e(WorkerSinkTaskThread.java:58)
>>         at org.apache.kafka.connect.util.ShutdownableThread.run(Shutdow
>> nableThread.java:82)
>> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
>> ILLEGAL_GENERATION occurred while committing offsets for group
>> connect-jdbc-sink-connector
>> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 96 - Got callback for timed
>> out commit Thread[WorkerSinkTask-jdbc-sink-connector-0,5,main]: -1, but
>> most recent commit is 47
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
>> connect-jdbc-sink-connector
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
>> (JOIN_GROUP: {group_id=connect-jdbc-sink-connector,session_timeout=300000
>> ,member_id=consumer-3-5a029818-f6be-4fde-b7e0-1bf751ff9722,
>> protocol_type=consumer,group_protocols=[{protocol_name=
>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]})
>> to coordinator 2147482646 <(214)%20748-2646>
>> 2017-01-20 17:22:25 INFO  AbstractCoordinator : 360 - Attempt to join
>> group connect-jdbc-sink-connector failed due to unknown member id,
>> resetting and retrying.
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
>> connect-jdbc-sink-connector
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
>> (JOIN_GROUP: {group_id=connect-jdbc-sink-connector,session_timeout=300000
>> ,member_id=,protocol_type=consumer,group_protocols=[{
>> protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
>> lim=24 cap=24]}]}) to coordinator 2147482646 <(214)%20748-2646>
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 342 - Joined group:
>> {error_code=0,generation_id=1,group_protocol=range,leader_id
>> =consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_id=
>> consumer-3-842b27da-535a-4715-aacc-1d38e2818586,members=[{
>> member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,
>> member_metadata=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]}
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 219 - Performing range
>> assignment for subscriptions {consumer-3-842b27da-535a-4715
>> -aacc-1d38e2818586=org.apache.kafka.clients.consumer.interna
>> ls.PartitionAssignor$Subscription@76f0d523}
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 223 - Finished
>> assignment: {consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.apache.
>> kafka.clients.consumer.internals.PartitionAssignor$Assignment@3d3929d6}
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 403 - Issuing leader
>> SyncGroup (SYNC_GROUP: {group_id=connect-jdbc-sink-co
>> nnector,generation_id=1,member_id=consumer-3-842b27da-535a-
>> 4715-aacc-1d38e2818586,group_assignment=[{member_id=consume
>> r-3-842b27da-535a-4715-aacc-1d38e2818586,member_assignment=java.nio.HeapByteBuffer[pos=0
>> lim=40 cap=40]}]}) to coordinator 2147482646 <(214)%20748-2646>
>> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 429 - Received successful
>> sync group response for group connect-jdbc-sink-connector:
>> {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=40
>> cap=40]}
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 185 - Setting newly
>> assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 575 - Fetching committed
>> offsets for partitions: [monitorAlert-2, monitorAlert-1, monitorAlert-0]
>> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
>> monitorAlert-2 to the committed offset 12768
>> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
>> assigned topic partition monitorAlert-2 with offset 12768
>> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
>> monitorAlert-0 to the committed offset 12690
>> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
>> assigned topic partition monitorAlert-0 with offset 12690
>> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
>> monitorAlert-1 to the committed offset 13048
>> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
>> assigned topic partition monitorAlert-1 with offset 13048
>> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 12768 for partition monitorAlert-2
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 13048 for partition monitorAlert-1
>> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 12690 for partition monitorAlert-0
>> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 104 - Finished
>> WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2
>> ms
>>
>> Then it tries to commit offsets multiple times. Finally, it throws this
>> error -
>> 017-01-20 17:25:40 INFO  WorkerSinkTask : 187 -
>> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
>> 2017-01-20 17:25:40 DEBUG KafkaConsumer : 1023 - Committing offsets:
>> {monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
>> monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
>> monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
>> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 12768 for partition monitorAlert-2
>> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 13048 for partition monitorAlert-1
>> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
>> 12690 for partition monitorAlert-0
>> 2017-01-20 17:25:40 DEBUG WorkerSinkTask : 104 - Finished
>> WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2
>> ms
>> 2017-01-20 17:25:40 DEBUG AbstractCoordinator : 621 - Received successful
>> heartbeat response.
>> 2017-01-20 17:25:41 DEBUG session : 347 - Scavenging sessions at
>> 1484954741657
>> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
>> partition monitorAlert-2 since its offset 12832 does not match the expected
>> offset 12768
>> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
>> partition monitorAlert-1 since its offset 13113 does not match the expected
>> offset 13048
>> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
>> partition monitorAlert-0 since its offset 12756 does not match the expected
>> offset 12690
>>
>>
>>
>> Here is the configuration in my connect-distributed.properties-
>> bootstrap.servers=clpd355:6667
>> group.id=alert
>> key.converter=org.apache.kafka.connect.storage.StringConverter
>> value.converter=org.apache.kafka.connect.json.JsonConverter
>> key.converter.schemas.enable=true
>> value.converter.schemas.enable=true
>> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.key.converter.schemas.enable=false
>> internal.value.converter.schemas.enable=false
>> offset.storage.topic=connect-offsets
>> offset.flush.interval.ms=10000
>> config.storage.topic=connect-configs
>> security.protocol=SASL_PLAINTEXT
>> sasl.mechanism=GSSAPI
>> sasl.kerberos.service.name=kafka
>> producer.sasl.kerberos.service.name=kafka
>> producer.security.protocol=SASL_PLAINTEXT
>> producer.sasl.mechanism=GSSAPI
>> consumer.sasl.kerberos.service.name=kafka
>> consumer.security.protocol=SASL_PLAINTEXT
>> consumer.sasl.mechanism=GSSAPI
>> consumer.fetch.message.max.bytes =1048576
>> consumer.session.timeout.ms=300000
>> consumer.request.timeout.ms=310000
>> consumer.max.partition.fetch.bytes=1048576
>> consumer.heartbeat.interval.ms= 60000
>> consumer.fetch.max.wait.ms=200000
>>
>> Thanks,
>> Sri
>>
>
>

Re: Kafka Connect Consumer throwing ILLEGAL_GENERATION when committing offsets and going into re-balance state

Posted by Srikrishna Alla <al...@gmail.com>.
Could someone please let me know what is going wrong in my Kafka cluster? I
would highly appreciate a response.

Thanks,
Sri

On Mon, Jan 23, 2017 at 1:47 PM, Srikrishna Alla <al...@gmail.com>
wrote:

> Hi,
>
> I am running a Kafka Sink Connector with Kafka 0.9.0.2. I am seeing that
> my consumer is periodically throwing an error when saving offsets and then
> going into group rebalance state. Please let me know what can be done to
> fix this issue
>
> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
> 2017-01-20 17:22:21 DEBUG NetworkClient : 619 - Sending metadata request
> ClientRequest(expectResponse=true, callback=null,
> request=RequestSend(header={api_key=3,api_version=0,
> correlation_id=74,client_id=consumer-3}, body={topics=[monitorAlert]}),
> isInitiatedByNetworkClient, createdTimeMs=1484954541923, sendTimeMs=0) to
> node 1001
> 2017-01-20 17:22:21 DEBUG Selector : 307 - Connection with clpd355.sldc./
> 135.41.3.85 disconnected
> java.io.EOFException
>         at org.apache.kafka.common.network.NetworkReceive.
> readFromReadableChannel(NetworkReceive.java:83)
>         at org.apache.kafka.common.network.NetworkReceive.
> readFrom(NetworkReceive.java:71)
>         at org.apache.kafka.common.network.KafkaChannel.receive(
> KafkaChannel.java:153)
>         at org.apache.kafka.common.network.KafkaChannel.read(
> KafkaChannel.java:134)
>         at org.apache.kafka.common.network.Selector.poll(
> Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:256)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:907)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:852)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:171)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> iteration(WorkerSinkTaskThread.java:90)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
>         at org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> 2017-01-20 17:22:21 DEBUG NetworkClient : 454 - Node 2147482646
> <(214)%20748-2646> disconnected.
> 2017-01-20 17:22:21 DEBUG ConsumerNetworkClient : 376 - Cancelled
> OFFSET_COMMIT request ClientRequest(expectResponse=true,
> callback=org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler@6428047c,
> request=RequestSend(header={api_key=8,api_version=2,
> correlation_id=73,client_id=consumer-3}, body={group_id=connect-jdbc-sink
> -connector,group_generation_id=1,member_id=consumer-3-
> 5a029818-f6be-4fde-b7e0-1bf751ff9722,retention_time=-
> 1,topics=[{topic=monitorAlert,partitions=[{partition=0,
> offset=12755,metadata=},{partition=1,offset=13113,metadata=},{partition=2,offset=12832,metadata=}]}]}),
> createdTimeMs=1484954541923, sendTimeMs=0) with correlation id 73 due to
> node 2147482646 being dis
> connected
> 2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
> coordinator 2147482646 <(214)%20748-2646> dead.
> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
> exception:
> org.apache.kafka.common.errors.DisconnectException
> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
> exception:
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
> group coordinator is not available.
> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 471 - Issuing group
> metadata request to broker 1001
> 2017-01-20 17:22:21 DEBUG Metadata : 172 - Updated cluster metadata
> version 6 to Cluster(nodes = [Node(1001, clpd355.sldc.sbc.com, 6667)],
> partitions = [Partition(topic = monitorAlert, partition = 2, leader = 1001,
> replicas = [1001,], isr = [1001,], Partition(topic = monitorAlert,
> partition = 0, leader = 1001, replicas = [1001,], isr = [1001,],
> Partition(topic =
>  monitorAlert, partition = 1, leader = 1001, replicas = [1001,], isr =
> [1001,]])
> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
> response ClientResponse(receivedTimeMs=1484954541926, disconnected=false,
> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.
> clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler@4940d8ef, request=RequestSend(header={
> api_key=10,api_version=0,correlati
> on_id=75,client_id=consumer-3}, body={group_id=connect-jdbc-sink-connector}),
> createdTimeMs=1484954541925, sendTimeMs=1484954541925),
> responseBody={error_code=0,coordinator={node_id=1001,host=
> clpd355.sldc.sbc.com,port=6667}})
> 2017-01-20 17:22:21 INFO  AbstractCoordinator : 535 - Marking the
> coordinator 2147482646 <(214)%20748-2646> dead.
>
> I see some group metadata request calls after this and then I see the
> ILLEGAL_GENERATION error
>
> 2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
> response ClientResponse(receivedTimeMs=1484954541975, disconnected=false,
> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.
> clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler@53c5eee0, request=RequestSend(header={
> api_key=10,api_version=0,correlation_id=109,client_id=consumer-3},
> body={group_id=connect-jdbc-sink-connector}),
> createdTimeMs=1484954541974, sendTimeMs=1484954541974),
> responseBody={error_code=0,coordinator={node_id=1001,host=
> clpd355.sldc.sbc.com,port=6667}})
> 2017-01-20 17:22:21 DEBUG NetworkClient : 487 - Initiating connection to
> node 2147482646 <(214)%20748-2646> at clpd355.sldc.sbc.com:6667.
> 2017-01-20 17:22:21 DEBUG SaslClientAuthenticator : 103 - Creating
> SaslClient: client=cdap/clpd355@COREITLTSTL10DEV01;service=
> kafka;serviceHostname=clpd355.sld.com;mechs=[GSSAPI]
> 2017-01-20 17:22:21 DEBUG NetworkClient : 467 - Completed connection to
> node 2147482646 <(214)%20748-2646>
> 2017-01-20 17:22:21 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
> 2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
> exception:
> org.apache.kafka.clients.consumer.internals.SendFailedException
> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> connect-jdbc-sink-connector
> 2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
> exception:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed due to group rebalance
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:546)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:487)
>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:671)
>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:650)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(
> ConsumerNetworkClient.java:380)
>         at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:274)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:907)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:852)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:171)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> iteration(WorkerSinkTaskThread.java:90)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
>         at org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12756, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 241 - Revoking previously
> assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> connect-jdbc-sink-connector
> 2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
> WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
> exception:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed due to group rebalance
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:546)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:487)
>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:671)
>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:650)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.
> RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(
> ConsumerNetworkClient.java:380)
>         at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:274)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:967)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.
> commitOffsets(WorkerSinkTask.java:207)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask$
> HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:376)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:244)
>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:208)
>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.ensurePartitionAssignment(
> ConsumerCoordinator.java:305)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:889)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:852)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:171)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> iteration(WorkerSinkTaskThread.java:90)
>         at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
>         at org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> 2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> connect-jdbc-sink-connector
> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 96 - Got callback for timed out
> commit Thread[WorkerSinkTask-jdbc-sink-connector-0,5,main]: -1, but most
> recent commit is 47
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
> connect-jdbc-sink-connector
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
> (JOIN_GROUP: {group_id=connect-jdbc-sink-connector,session_timeout=
> 300000,member_id=consumer-3-5a029818-f6be-4fde-b7e0-
> 1bf751ff9722,protocol_type=consumer,group_protocols=[{
> protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
> lim=24 cap=24]}]}) to coordinator 2147482646
> 2017-01-20 17:22:25 INFO  AbstractCoordinator : 360 - Attempt to join
> group connect-jdbc-sink-connector failed due to unknown member id,
> resetting and retrying.
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
> connect-jdbc-sink-connector
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
> (JOIN_GROUP: {group_id=connect-jdbc-sink-connector,session_timeout=
> 300000,member_id=,protocol_type=consumer,group_protocols=
> [{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
> lim=24 cap=24]}]}) to coordinator 2147482646 <(214)%20748-2646>
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 342 - Joined group:
> {error_code=0,generation_id=1,group_protocol=range,leader_
> id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_
> id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,
> members=[{member_id=consumer-3-842b27da-535a-4715-aacc-
> 1d38e2818586,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=24
> cap=24]}]}
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 219 - Performing range
> assignment for subscriptions {consumer-3-842b27da-535a-
> 4715-aacc-1d38e2818586=org.apache.kafka.clients.consumer.
> internals.PartitionAssignor$Subscription@76f0d523}
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 223 - Finished assignment:
> {consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.
> apache.kafka.clients.consumer.internals.PartitionAssignor$
> Assignment@3d3929d6}
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 403 - Issuing leader
> SyncGroup (SYNC_GROUP: {group_id=connect-jdbc-sink-
> connector,generation_id=1,member_id=consumer-3-842b27da-
> 535a-4715-aacc-1d38e2818586,group_assignment=[{member_id=
> consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_
> assignment=java.nio.HeapByteBuffer[pos=0 lim=40 cap=40]}]}) to
> coordinator 2147482646
> 2017-01-20 17:22:25 DEBUG AbstractCoordinator : 429 - Received successful
> sync group response for group connect-jdbc-sink-connector:
> {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=40
> cap=40]}
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 185 - Setting newly
> assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 575 - Fetching committed
> offsets for partitions: [monitorAlert-2, monitorAlert-1, monitorAlert-0]
> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
> monitorAlert-2 to the committed offset 12768
> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
> assigned topic partition monitorAlert-2 with offset 12768
> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
> monitorAlert-0 to the committed offset 12690
> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
> assigned topic partition monitorAlert-0 with offset 12690
> 2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
> monitorAlert-1 to the committed offset 13048
> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
> assigned topic partition monitorAlert-1 with offset 13048
> 2017-01-20 17:22:25 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
> 12768 for partition monitorAlert-2
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
> 13048 for partition monitorAlert-1
> 2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
> 12690 for partition monitorAlert-0
> 2017-01-20 17:22:25 DEBUG WorkerSinkTask : 104 - Finished
> WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2
> ms
>
> Then it tries to commit offsets multiple times. Finally, it throws this
> error -
> 017-01-20 17:25:40 INFO  WorkerSinkTask : 187 -
> WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
> 2017-01-20 17:25:40 DEBUG KafkaConsumer : 1023 - Committing offsets:
> {monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
> monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
> monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
> 12768 for partition monitorAlert-2
> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
> 13048 for partition monitorAlert-1
> 2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
> 12690 for partition monitorAlert-0
> 2017-01-20 17:25:40 DEBUG WorkerSinkTask : 104 - Finished
> WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2
> ms
> 2017-01-20 17:25:40 DEBUG AbstractCoordinator : 621 - Received successful
> heartbeat response.
> 2017-01-20 17:25:41 DEBUG session : 347 - Scavenging sessions at
> 1484954741657
> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
> partition monitorAlert-2 since its offset 12832 does not match the expected
> offset 12768
> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
> partition monitorAlert-1 since its offset 13113 does not match the expected
> offset 13048
> 2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
> partition monitorAlert-0 since its offset 12756 does not match the expected
> offset 12690
>
>
>
> Here is the configuration in my connect-distributed.properties-
> bootstrap.servers=clpd355:6667
> group.id=alert
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> offset.storage.topic=connect-offsets
> offset.flush.interval.ms=10000
> config.storage.topic=connect-configs
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=GSSAPI
> sasl.kerberos.service.name=kafka
> producer.sasl.kerberos.service.name=kafka
> producer.security.protocol=SASL_PLAINTEXT
> producer.sasl.mechanism=GSSAPI
> consumer.sasl.kerberos.service.name=kafka
> consumer.security.protocol=SASL_PLAINTEXT
> consumer.sasl.mechanism=GSSAPI
> consumer.fetch.message.max.bytes =1048576
> consumer.session.timeout.ms=300000
> consumer.request.timeout.ms=310000
> consumer.max.partition.fetch.bytes=1048576
> consumer.heartbeat.interval.ms= 60000
> consumer.fetch.max.wait.ms=200000
>
> Thanks,
> Sri
>