You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "karthik (Jira)" <ji...@apache.org> on 2021/10/02 13:39:00 UTC
[jira] [Updated] (KAFKA-13339) Kstream not fetch all the messages
[ https://issues.apache.org/jira/browse/KAFKA-13339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
karthik updated KAFKA-13339:
----------------------------
Description:
i used the below Kstream code for fetch all the records from my topic and facing below error.
code :
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);
kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
//kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v));
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
System.out.println("Starting stream.");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{ System.out.println("Shutting down stream"); streams.close(); }
));
*Error :*
DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Built incremental fetch (sessionId=797093970, epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=Producer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-topic-0)) to broker
DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch
was:
i used the below Kstream code for fetch all the records from my topic and facing below error.
code :
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);
kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
//kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v));
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
System.out.println("Starting stream.");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{ System.out.println("Shutting down stream"); streams.close(); }
));
*Error :*
DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Built incremental fetch (sessionId=797093970, epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=Producer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-topic-0)) to broker
DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Added READ_UNCOMMITTED fetch request for partition pi-hist-topic-1 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch
> Kstream not fetch all the messages
> ----------------------------------
>
> Key: KAFKA-13339
> URL: https://issues.apache.org/jira/browse/KAFKA-13339
> Project: Kafka
> Issue Type: New Feature
> Components: consumer
> Reporter: karthik
> Priority: Major
>
> i used the below Kstream code for fetch all the records from my topic and facing below error.
>
> code :
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers1);
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
> StreamsBuilder streamsBuilder = new StreamsBuilder();
> KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);
> kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
> //kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v));
> Topology topology = streamsBuilder.build();
> KafkaStreams streams = new KafkaStreams(topology, props);
> System.out.println("Starting stream.");
> streams.start();
> Runtime.getRuntime().addShutdownHook(new Thread(() ->
> { System.out.println("Shutting down stream"); streams.close(); }
> ));
>
> *Error :*
> DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Built incremental fetch (sessionId=797093970, epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
> 18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=Producer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-topic-0)) to broker
> DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)