You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "chao.wu (JIRA)" <ji...@apache.org> on 2018/01/23 05:23:02 UTC
[jira] [Created] (KAFKA-6470) no continuous offset for function
seek
chao.wu created KAFKA-6470:
------------------------------
Summary: no continuous offset for function seek
Key: KAFKA-6470
URL: https://issues.apache.org/jira/browse/KAFKA-6470
Project: Kafka
Issue Type: Bug
Components: clients, core
Affects Versions: 0.10.0.1
Reporter: chao.wu
A topic-partition "adn-tracking,15" in kafka who's earliest offset is 1255644602 and latest offset is 1271253441.
while starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record XXXX even after seeking to offset 1266921577".
I implemented a simple project to use consumer to seek offset 1266921577. But it return the offset 1266921578. Then while seek to 1266921576, it return the 1266921576 exactly。
Why ? How to fix that ?
There is the code:
public class consumerDemo {
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
Collection<TopicPartition> collection = new ArrayList<TopicPartition>();
collection.add(tp);
consumer.assign(collection);
consumer.seek(tp, 1266921576);
ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);
List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp);
Iterator<ConsumerRecord<String, String> > iter = listR.iterator();
ConsumerRecord<String, String> record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + record.topic());
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)