You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2019/08/26 17:13:00 UTC

[jira] [Comment Edited] (KAFKA-8831) Joining a new instance sometimes does not cause rebalancing

    [ https://issues.apache.org/jira/browse/KAFKA-8831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915954#comment-16915954 ] 

Vinoth Chandar edited comment on KAFKA-8831 at 8/26/19 5:12 PM:
----------------------------------------------------------------

[~cpettitt-confluent] I am not seeing it recover after 30 seconds.  but in my scenario, there is also state restoration involved, so could be longer?  Let me investigate more and report back here.  

 

FWIW this is the snippet of my DAG if anyone is interested. 

 
{code:java}
rprops.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, config.optionSet.valueOf(config.serverUriSpec));
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Actual Streams benchmark DAG
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, byte[]> source = builder.stream(workload.inputTopicName());
KGroupedStream<String, byte[]> groupedStream = source
    .selectKey((key, value) -> Workload.groupKey(key))
    .groupByKey();
groupedStream.windowedBy(TimeWindows.of(Duration.ofSeconds(workload.aggregationWindowSecs())))
    .aggregate(
        () -> new byte[0], // 0 byte initializer
        (aggKey, newValue, aggValue) -> newValue,
        // accept the new value, to generate some variable length updates
        Materialized.<String, byte[], WindowStore<Bytes, byte[]>>as(workload.stateStoreName())
            .withValueSerde(Serdes.ByteArray())
    ).toStream().print(Printed.toSysOut());

final KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}


was (Author: vc):
[~cpettitt-confluent] I am not seeing it recover after 30 seconds.  but in my scenario, there is also state restoration involved, so could be longer?  Let me investigate more and report back here.  

> Joining a new instance sometimes does not cause rebalancing
> -----------------------------------------------------------
>
>                 Key: KAFKA-8831
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8831
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Chris Pettitt
>            Assignee: Chris Pettitt
>            Priority: Major
>         Attachments: StandbyTaskTest.java, fail.log
>
>
> See attached log. The application is in a REBALANCING state. The second instance joins a bit after the first instance (~250ms). The group coordinator says it is going to rebalance but nothing happens. The first instance gets all partitions (2). The application transitions to RUNNING.
> See attached test, which starts one client and then starts another about 250ms later. This seems to consistently repro the issue for me.
> This is blocking my work on KAFKA-8755, so I'm inclined to pick it up



--
This message was sent by Atlassian Jira
(v8.3.2#803003)