You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "chao.wu (JIRA)" <ji...@apache.org> on 2018/01/23 10:48:00 UTC

[jira] [Updated] (KAFKA-6470) no continuous offset for function seek

     [ https://issues.apache.org/jira/browse/KAFKA-6470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

chao.wu updated KAFKA-6470:
---------------------------
    Description: 
A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  1255644602 and  latest offset is 1271253441.  

while starting a spark streaming to process the data from the topic ,  we got a exception with "Got wrong record XXXX  even after seeking to offset 1266921577".

I  implemented a simple project to use consumer to  seek offset 1266921577. But it return the offset 1266921578. Then while  seek to 1266921576, it return the 1266921576 exactly。 

Why ?  How to fix that ?

 

 

There is the code:

public class consumerDemo {

public static void main(String[] argv)

{

 

Properties props = new Properties();

props.put("bootstrap.servers", "172.31.29.31:9091");

props.put("group.id", "consumer-tutorial-demo");

props.put("key.deserializer", StringDeserializer.class.getName());

props.put("value.deserializer", StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

TopicPartition tp = new TopicPartition("adn-tracking-click", 15);

Collection<TopicPartition> collection = new ArrayList<TopicPartition>();

collection.add(tp);

consumer.assign(collection);

consumer.seek(tp, 1266921576);

ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);

List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp);

Iterator<ConsumerRecord<String, String> > iter = listR.iterator();

ConsumerRecord<String, String> record = iter.next();

System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); }

}

 

 

 

    

  was:
A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  1255644602 and  latest offset is 1271253441.  

while starting a spark streaming to process the data from the topic ,  we got a exception with "Got wrong record XXXX  even after seeking to offset 1266921577".

I  implemented a simple project to use consumer to  seek offset 1266921577. But it return the offset 1266921578. Then while  seek to 1266921576, it return the 1266921576 exactly。 

Why ?  How to fix that ?

 

 

There is the code:

public class consumerDemo {




 public static void main(String[] argv){
 Properties props = new Properties();
 props.put("bootstrap.servers", "172.31.29.31:9091");
 props.put("group.id", "consumer-tutorial-demo");
 props.put("key.deserializer", StringDeserializer.class.getName());
 props.put("value.deserializer", StringDeserializer.class.getName());
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
 Collection<TopicPartition> collection = new ArrayList<TopicPartition>();
 collection.add(tp);
 consumer.assign(collection);
 consumer.seek(tp, 1266921576);
 ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);

 List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp);
 Iterator<ConsumerRecord<String, String> > iter = listR.iterator();
 ConsumerRecord<String, String> record = iter.next();
 System.out.println(" the next record " + record.offset() + " recode topic " + record.topic());


 }
}

 

 

 

    


> no continuous offset for function seek
> --------------------------------------
>
>                 Key: KAFKA-6470
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6470
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 0.10.0.1
>            Reporter: chao.wu
>            Priority: Major
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  1255644602 and  latest offset is 1271253441.  
> while starting a spark streaming to process the data from the topic ,  we got a exception with "Got wrong record XXXX  even after seeking to offset 1266921577".
> I  implemented a simple project to use consumer to  seek offset 1266921577. But it return the offset 1266921578. Then while  seek to 1266921576, it return the 1266921576 exactly。 
> Why ?  How to fix that ?
>  
>  
> There is the code:
> public class consumerDemo {
> public static void main(String[] argv)
> {
>  
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
> TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
> Collection<TopicPartition> collection = new ArrayList<TopicPartition>();
> collection.add(tp);
> consumer.assign(collection);
> consumer.seek(tp, 1266921576);
> ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);
> List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp);
> Iterator<ConsumerRecord<String, String> > iter = listR.iterator();
> ConsumerRecord<String, String> record = iter.next();
> System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); }
> }
>  
>  
>  
>     



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)