You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2022/06/08 04:38:27 UTC

Re: unsubscribed from all topics when adding a KTable

Hello Meir,

From the code snippet I cannot find where did you add a KTable, it seems
you created a KStream from the source topic, and aggregate the stream into
a KTable, could you show me the code difference between "adding a KTable"
v.s. "adding a KStream"?

Anyways, the log line should only happen when `unsubscribe` is explicitly
called on the consumer which would happen only for two cases: 1) the
instance is shutting down (potentially due to an exception), 2) the
instance is handling a task-migrated exception. In either case you should
see other log lines on INFO/WARN indicating the cases. I suspect your code
has something that throws an exception right upon starting up that caused
it to shutdown (i.e. case 1) but that should be easily confirmed from the
other log lines.


Guozhang



On Fri, May 27, 2022 at 2:31 PM Meir Goldenberg <me...@hotmail.com>
wrote:

> Hi,
>
>
> I'm trying to write a very basic Kafka streams consumer in Java.
> Once I add a KTable, I see a message in the server log that I have been
> unsubscribed from all topics.
> Doing the same with a KStream instead of KTable works fine for me.
>
> I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on
> raspbian OS.
>
> I tried modifying the group.initial.rebalance.delay.ms in the server
> properties but this did not help.
>
> The message I get in the server log is:
>
> [2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with
> unknown member id joins group streams-wiki-created-table in Empty state.
> Created a new member id
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> and request the member to rejoin with this id.
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to
> rebalance group streams-wiki-created-table in state PreparingRebalance with
> old generation 2 (__consumer_offsets-16) (reason: Adding new member
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> with group instance id None; client reason: rebalance failed due to 'The
> group member needs to have a valid member id before actually entering a
> consumer group.' (MemberIdRequiredException))
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group
> streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1
> members (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received
> from leader
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> for group streams-wiki-created-table for generation 3. The group has 1
> members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to
> rebalance group streams-wiki-created-table in state PreparingRebalance with
> old generation 3 (__consumer_offsets-16) (reason: Removing member
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> on LeaveGroup; client reason: the consumer unsubscribed from all topics)
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group
> streams-wiki-created-table with generation 4 is now empty
> (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member
> MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e,
> groupInstanceId=None,
> clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer,
> clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000,
> supportedProtocols=List(stream)) has left group streams-wiki-created-table
> through explicit `LeaveGroup`; client reason: the consumer unsubscribed
> from all topics (kafka.coordinator.group.GroupCoordinator)
>
>
> My code is as following:
>
>
>
>
>
>         properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streams-wiki-created-table");
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
>         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
>         TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
>         TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
>         TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
>         TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));
>
>         StreamsBuilder builder = new StreamsBuilder();
>
>         KTable<String, Long> createdPagesUserTypeTable =
> builder.stream("temp-create-stream", Consumed.with(Serdes.String(),
> WikiEventSerdes.WikiEvent()))
>         .selectKey((ignored, value) ->
> value.getUserType()).groupByKey().count();
>
>
>         Topology topology = builder.build();
>         KafkaStreams streams = new KafkaStreams(topology, props);
>         CountDownLatch latch = new CountDownLatch(1);
>
>
>         // attach shutdown handler to catch control-c
>         Runtime.getRuntime().addShutdownHook(new
> Thread("streams-shutdown-hook") {
>             @Override
>             public void run() {
>                 streams.close();
>                 latch.countDown();
>             }
>         });
>
>         try {
>             streams.start();
>             latch.await();
>         } catch (Throwable e) {
>             System.exit(1);
>         }
>         System.exit(0);
>     }
>
>
>
> Can someone please help me figure out what's wrong here?
>
> Thanks,
>
> Meir
>


-- 
-- Guozhang