You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Darya Merkureva (JIRA)" <ji...@apache.org> on 2019/05/17 15:09:00 UTC

[jira] [Updated] (KAFKA-8380) We can not create a topic, immediately write to it and then read.

     [ https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Darya Merkureva updated KAFKA-8380:
-----------------------------------
    Summary: We can not create a topic, immediately write to it and then read.  (was: I can not create a topic, immediately write to it and then read.)

> We can not create a topic, immediately write to it and then read.
> -----------------------------------------------------------------
>
>                 Key: KAFKA-8380
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8380
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.2.0
>            Reporter: Darya Merkureva
>            Priority: Blocker
>
> We are trying to create a topic, immediately write to it and read. 
> For some reason, we read nothing in spite of the fact that we are waiting for the completion of KafkaFuture. 
> {code:java}
> public class main {
> 	private static final String TOPIC_NAME = "topic";
> 	private static final String KEY_NAME = "key";
> 	public static void main(String[] args) {
> 		final Properties prodProps = new Properties();
> 		prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> 		prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
> 		prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
> 		prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
> 		prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
> 		final Producer<String, String> prod = new KafkaProducer<>(prodProps);
> 		final Properties admProps = new Properties();
> 		admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> 		final AdminClient adm = KafkaAdminClient.create(admProps);
> 		final Properties consProps = new Properties();
> 		consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> 		consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
> 		consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> 		consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
> 		consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
> 		consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
> 		consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
> 		final Consumer<String,String> cons = new KafkaConsumer<>(consProps);
> 		
> 		try {
> 			final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, (short)1);
> 			val createTopicsResult = adm.createTopics(Collections.singleton(newTopic));
> 			createTopicsResult.values().get(TOPIC_NAME).get();
> 		} catch (InterruptedException | ExecutionException e) {
> 			if (!(e.getCause() instanceof TopicExistsException)) {
> 				throw new RuntimeException(e.getMessage(), e);
> 			}
> 		}
> 		
> 		final ProducerRecord<String, String> producerRecord =
> 				new ProducerRecord<>(TOPIC_NAME, KEY_NAME, "data");
> 		prod.send(producerRecord);
> 		prod.send(producerRecord);
> 		prod.send(producerRecord);
> 		prod.send(producerRecord);
> 		cons.subscribe(Arrays.asList(TOPIC_NAME));
> 		val records  = cons.poll(Duration.ofSeconds(10));
> 		for(var record: records){
> 			System.out.println(record.value());
> 		}
> 	}
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)