You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by R Krishna <kr...@gmail.com> on 2016/03/22 08:12:15 UTC

Starting v0.9.0.1 consumers from beginning only first time does not work auto.offset.reset=earliest

We are just evaluating Kafka starting with the latest v0.9.0.1. A very
basic use case is to start initial consumers to begin from the earliest
entry only for the first time, the second restarts will start from an
offset wherever they left off. From what I read, you can set
auto.offset.reset = earliest to do this, but it does not seem to work.

consumer.subscribe(Collections.singletonList(KafkaProperties.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s",
                    record.offset(), record.key(), record.value());
        }



LOGS:
[he.kafka.clients.consumer.ConsumerConfig]  - ConsumerConfig values:
    request.timeout.ms = 40000
    check.crcs = true
    retry.backoff.ms = 100
    ssl.truststore.password = null
    ssl.keymanager.algorithm = SunX509
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.key.password = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.provider = null
    sasl.kerberos.service.name = null
    session.timeout.ms = 30000
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [10.30.26.98:32774]
    client.id = consumer-7
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    key.deserializer = class org.apache.kafka.common.serialization.
IntegerDeserializer
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    auto.offset.reset = earliest
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    value.deserializer = class org.apache.kafka.common.serialization.
StringDeserializer
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.
RangeAssignor]
    ssl.endpoint.identification.algorithm = null
    max.partition.fetch.bytes = 1048576
    ssl.keystore.location = null
    ssl.truststore.location = null
    ssl.keystore.password = null
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    security.protocol = PLAINTEXT
    auto.commit.interval.ms = 1000
    ssl.protocol = TLS
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.trustmanager.algorithm = PKIX
    group.id = DemoConsumer7
    enable.auto.commit = true
    metric.reporters = []
    ssl.truststore.type = JKS
    send.buffer.bytes = 131072
    reconnect.backoff.ms = 50
    metrics.num.samples = 2
    ssl.keystore.type = JKS
    heartbeat.interval.ms = 3000

 - 2016-03-21 17:11:34.786 DEBUG main       [che.kafka.clients.consumer.
KafkaConsumer]  - Starting the Kafka consumer
 - 2016-03-21 17:11:39.318 DEBUG main       [org.apache.kafka.clients.
Metadata       ]  - Updated cluster metadata version 1 to Cluster(nodes = [
Node(-1, 10.30.26.98, 32774)], partitions = [])
 - 2016-03-21 17:11:39.406 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name connections-closed:client-id-consumer-7
 - 2016-03-21 17:11:39.414 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name connections-created:client-id-consumer-
7
 - 2016-03-21 17:11:39.415 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name bytes-sent-received:client-id-consumer-
7
 - 2016-03-21 17:11:39.415 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name bytes-sent:client-id-consumer-7
 - 2016-03-21 17:11:39.418 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name bytes-received:client-id-consumer-7
 - 2016-03-21 17:11:39.418 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name select-time:client-id-consumer-7
 - 2016-03-21 17:11:39.419 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name io-time:client-id-consumer-7
 - 2016-03-21 17:11:39.448 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name heartbeat-latency
 - 2016-03-21 17:11:39.448 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name join-latency
 - 2016-03-21 17:11:39.448 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name sync-latency
 - 2016-03-21 17:11:39.450 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name commit-latency
 - 2016-03-21 17:11:39.456 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name bytes-fetched
 - 2016-03-21 17:11:39.456 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name records-fetched
 - 2016-03-21 17:11:39.456 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name fetch-latency
 - 2016-03-21 17:11:39.457 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name records-lag
 - 2016-03-21 17:11:39.457 DEBUG main       [org.apache.kafka.common.metrics
.Metrics ]  - Added sensor with name fetch-throttle-time
 - 2016-03-21 17:11:39.458 INFO  main       [.apache.kafka.common.utils.
AppInfoParser]  - Kafka version : 0.9.0.1
 - 2016-03-21 17:11:39.458 INFO  main       [.apache.kafka.common.utils.
AppInfoParser]  - Kafka commitId : 23c69d62a0cabf06
 - 2016-03-21 17:11:39.459 DEBUG main       [che.kafka.clients.consumer.
KafkaConsumer]  - Kafka consumer created
 - 2016-03-21 17:11:39.460 INFO  KafkaConsumerExample [s.nextgen.kafka.
examples.SimpleConsumer6]  - [KafkaConsumerExample], Starting
 - 2016-03-21 17:11:39.460 INFO  KafkaConsumerExample [s.nextgen.kafka.
examples.SimpleConsumer6]  - [KafkaConsumerExample], Starting
 - 2016-03-21 17:11:39.461 DEBUG KafkaConsumerExample [che.kafka.clients.
consumer.KafkaConsumer]  - Subscribed to topic(s): test-topic
 - 2016-03-21 17:11:39.461 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Issuing group metadata request to broker -1
 - 2016-03-21 17:11:39.473 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Initiating connection to node -1 at 10.30.26.98:
32774.
 - 2016-03-21 17:11:39.479 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node--1.bytes-sent
 - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node--1.bytes-received
 - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node--1.latency
 - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Completed connection to node -1
 - 2016-03-21 17:11:39.576 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Sending metadata request ClientRequest(
expectResponse=true, callback=null, request=RequestSend(header={api_key=3,
api_version=0,correlation_id=1,client_id=consumer-7}, body={topics=[test-
topic]}), isInitiatedByNetworkClient, createdTimeMs=1458605499573,
sendTimeMs=0) to node -1
 - 2016-03-21 17:11:39.604 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.Metadata       ]  - Updated cluster metadata version 2 to Cluster(nodes
= [Node(1001, 10.30.26.98, 32774)], partitions = [Partition(topic = test-
topic, partition = 8, leader = 1001, replicas = [1001,], isr = [1001,],
Partition(topic = test-topic, partition = 7, leader = 1001, replicas = [1001
,], isr = [1001,], Partition(topic = test-topic, partition = 2, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic,
partition = 4, leader = 1001, replicas = [1001,], isr = [1001,],
Partition(topic
= test-topic, partition = 3, leader = 1001, replicas = [1001,], isr = [1001
,], Partition(topic = test-topic, partition = 6, leader = 1001, replicas = [
1001,], isr = [1001,], Partition(topic = test-topic, partition = 0, leader =
1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic,
partition = 1, leader = 1001, replicas = [1001,], isr = [1001,],
Partition(topic
= test-topic, partition = 5, leader = 1001, replicas = [1001,], isr = [1001
,], Partition(topic = test-topic, partition = 9, leader = 1001, replicas = [
1001,], isr = [1001,]])
 - 2016-03-21 17:11:39.605 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Issuing group metadata request to broker 1001
 - 2016-03-21 17:11:39.605 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Initiating connection to node 1001 at 10.30.
26.98:32774.
 - 2016-03-21 17:11:39.610 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-1001.bytes-sent
 - 2016-03-21 17:11:39.610 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-1001.bytes-received
 - 2016-03-21 17:11:39.611 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-1001.latency
 - 2016-03-21 17:11:39.611 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Completed connection to node 1001
 - 2016-03-21 17:11:39.618 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Group metadata response ClientResponse(
receivedTimeMs=1458605499617, disconnected=false, request=ClientRequest(
expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient$RequestFutureCompletionHandler@18399f62, request=
RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id=
consumer-7}, body={group_id=DemoConsumer7}), createdTimeMs=1458605499605,
sendTimeMs=1458605499611), responseBody={error_code=0,coordinator={node_id=
1001,host=10.30.26.98,port=32774}})
 - 2016-03-21 17:11:39.618 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Initiating connection to node 2147482646 at
10.30.26.98:32774.
 - 2016-03-21 17:11:39.619 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Revoking previously assigned partitions []
 - 2016-03-21 17:11:39.619 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - (Re-)joining group DemoConsumer7
 - 2016-03-21 17:11:39.621 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Issuing request (JOIN_GROUP: {group_id=DemoConsumer7
,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{
protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22
cap=22]}]}) to coordinator 2147482646
 - 2016-03-21 17:11:39.623 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-2147482646.bytes-
sent
 - 2016-03-21 17:11:39.623 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-2147482646.bytes-
received
 - 2016-03-21 17:11:39.624 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name node-2147482646.latency
 - 2016-03-21 17:11:39.624 DEBUG KafkaConsumerExample [org.apache.kafka.
clients.NetworkClient  ]  - Completed connection to node 2147482646
 - 2016-03-21 17:11:39.631 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Joined group: {error_code=0,generation_id=1,
group_protocol=range,leader_id=consumer-7-91f534c8-4443-4c2e-990f-
d1de045bc5ab,member_id=consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab,
members=[{member_id=consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab,
member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}
 - 2016-03-21 17:11:39.631 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Performing range assignment for subscriptions {
consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab=org.apache.kafka.clients.
consumer.internals.PartitionAssignor$Subscription@478e2443}
 - 2016-03-21 17:11:39.632 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Finished assignment: {consumer-7-91f534c8-4443-
4c2e-990f-d1de045bc5ab=org.apache.kafka.clients.consumer.internals.
PartitionAssignor$Assignment@550c9d49}
 - 2016-03-21 17:11:39.633 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Issuing leader SyncGroup (SYNC_GROUP: {group_id=
DemoConsumer7,generation_id=1,member_id=consumer-7-91f534c8-4443-4c2e-990f-
d1de045bc5ab,group_assignment=[{member_id=consumer-7-91f534c8-4443-4c2e-990f
-d1de045bc5ab,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=66 cap=66
]}]}) to coordinator 2147482646
 - 2016-03-21 17:11:39.639 DEBUG KafkaConsumerExample [s.consumer.internals.
AbstractCoordinator]  - Received successful sync group response for group
DemoConsumer7: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0
lim=66 cap=66]}
 - 2016-03-21 17:11:39.641 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Setting newly assigned partitions [test-topic-8,
test-topic-7, test-topic-2, test-topic-4, test-topic-3, test-topic-6, test-
topic-1, test-topic-0, test-topic-5, test-topic-9]
 - 2016-03-21 17:11:39.641 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Fetching committed offsets for partitions: [test-
topic-8, test-topic-7, test-topic-2, test-topic-4, test-topic-3, test-topic-
6, test-topic-1, test-topic-0, test-topic-5, test-topic-9]
 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-8
 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-7
 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-2
 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-4
 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-3
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-6
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-0
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-1
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-5
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - No committed offset for partition test-topic-9
 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-8
to earliest offset.
 - 2016-03-21 17:11:39.653 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243403 for partition test-
topic-8
 - 2016-03-21 17:11:39.653 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-7
to earliest offset.
 - 2016-03-21 17:11:39.656 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243467 for partition test-
topic-7
 - 2016-03-21 17:11:39.656 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-2
to earliest offset.
 - 2016-03-21 17:11:39.660 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243403 for partition test-
topic-2
 - 2016-03-21 17:11:39.660 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-4
to earliest offset.
 - 2016-03-21 17:11:39.664 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243512 for partition test-
topic-4
 - 2016-03-21 17:11:39.665 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-3
to earliest offset.
 - 2016-03-21 17:11:39.668 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243531 for partition test-
topic-3
 - 2016-03-21 17:11:39.668 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-6
to earliest offset.
 - 2016-03-21 17:11:39.672 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243538 for partition test-
topic-6
 - 2016-03-21 17:11:39.672 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-0
to earliest offset.
 - 2016-03-21 17:11:39.676 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243465 for partition test-
topic-0
 - 2016-03-21 17:11:39.676 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-1
to earliest offset.
 - 2016-03-21 17:11:39.679 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243369 for partition test-
topic-1
 - 2016-03-21 17:11:39.680 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-5
to earliest offset.
 - 2016-03-21 17:11:39.684 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243480 for partition test-
topic-5
 - 2016-03-21 17:11:39.684 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Resetting offset for partition test-topic-9
to earliest offset.
 - 2016-03-21 17:11:39.688 DEBUG KafkaConsumerExample [kafka.clients.
consumer.internals.Fetcher]  - Fetched offset 1243441 for partition test-
topic-9
 - 2016-03-21 17:11:40.213 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name topic.test-topic.bytes-
fetched
 - 2016-03-21 17:11:40.213 DEBUG KafkaConsumerExample [org.apache.kafka.
common.metrics.Metrics ]  - Added sensor with name topic.test-topic.records-
fetched
done
 - 2016-03-21 17:11:40.466 DEBUG KafkaConsumerExample [che.kafka.clients.
consumer.KafkaConsumer]  - Subscribed to topic(s): test-topic
 - 2016-03-21 17:11:40.659 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243403 for partition test-topic-8
 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243467 for partition test-topic-7
 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243403 for partition test-topic-2
 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243512 for partition test-topic-4
 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243531 for partition test-topic-3
 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243538 for partition test-topic-6
 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243465 for partition test-topic-0
 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243369 for partition test-topic-1
 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243480 for partition test-topic-5
 - 2016-03-21 17:11:40.662 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243441 for partition test-topic-9
done
 - 2016-03-21 17:11:41.467 DEBUG KafkaConsumerExample [che.kafka.clients.
consumer.KafkaConsumer]  - Subscribed to topic(s): test-topic
 - 2016-03-21 17:11:41.648 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243403 for partition test-topic-8
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243467 for partition test-topic-7
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243403 for partition test-topic-2
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243512 for partition test-topic-4
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243531 for partition test-topic-3
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243538 for partition test-topic-6
 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243465 for partition test-topic-0
 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243369 for partition test-topic-1
 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243480 for partition test-topic-5
 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals.
ConsumerCoordinator]  - Committed offset 1243441 for partition test-topic-9
done



I tried poll(0), seekToBeginning but nothing works:
                consumer.poll(0);
                consumer.seekToBeginning();
                consumer.poll(0);

The topic always shows there are many messages:
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
10.35.25.95:32774 --topic test-topic --time -1 --offsets 1 | awk -F  ":"
'{sum += $3} END {print sum}'
12434609

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --describe --group
DemoConsumer15 --bootstrap-server 10.35.25.95:32774
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
DemoConsumer15, test-topic, 0, 1243465, 1381871, 138406,
consumer-15_/10.2...
DemoConsumer15, test-topic, 1, 1243369, 1381771, 138402,
consumer-15_/10.2...
DemoConsumer15, test-topic, 2, 1243403, 1381762, 138359,
consumer-15_/10.2...
DemoConsumer15, test-topic, 3, 1243531, 1382003, 138472,
consumer-15_/10.2...
DemoConsumer15, test-topic, 4, 1243512, 1381979, 138467,
consumer-15_/10.2...
DemoConsumer15, test-topic, 5, 1243480, 1381880, 138400,
consumer-15_/10.2...



-- 
Kisna