You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2019/10/17 20:40:00 UTC

[jira] [Resolved] (KAFKA-8902) Benchmark cooperative vs eager rebalancing

     [ https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

John Roesler resolved KAFKA-8902.
---------------------------------
    Resolution: Fixed

I wrote a simple Streams application using the Processor API to update 10 stores with every record seen.

{code}
        final int numStores = 10;
        final Topology topology = new Topology();

        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "table-in");

        topology.addProcessor(
            "processor",
            (ProcessorSupplier<String, String>) () -> new Processor<String, String>() {
                private final List<KeyValueStore<String, String>> stores = new ArrayList<>(numStores);

                @SuppressWarnings("unchecked")
                @Override
                public void init(final ProcessorContext context) {
                    for (int i = 0; i < numStores; i++) {
                        stores.add(
                            i,
                            (KeyValueStore<String, String>) context.getStateStore("store" + i)
                        );
                    }
                }

                @Override
                public void process(final String key, final String value) {
                    for (final KeyValueStore<String, String> store : stores) {
                        store.put(key, value);
                    }
                }

                @Override
                public void close() {
                    stores.clear();
                }
            },
            "source"
        );
{code}

I tested this topology using both in-memory and on-disk stores, with caching and logging enabled.

My benchmark consisted of running one KafkaStreams instance and measuring its metrics, while simulating other nodes joining and leaving the cluster (by constructing the simulated nodes to participate in the consumer group protocol without actually doing any work). I tested three cluster rebalance scenarios:
* scale up: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, add one node (each node loses one task), run 2 minutes, add one node (each node loses another task), run two minutes, add two nodes (each node loses one task), end the test at the 10 minute mark
* rolling bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, bounce each node in the cluster (waiting for it to join and all nodes to return to RUNNING before proceeding), end the test at the 10 minute mark
* full bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, bounce each node in the cluster (without waiting, so they all leave and join at once), end the test at the 10 minute mark

For input data, I randomly generated a dataset of 10,000 keys, and another with 100,000 keys, all with 1kB values. This data was pre-loaded into the broker, with compaction and retention disabled (so that every test iteration would get the same sequence of updates)

I ran all the benchmarks on AWS i3.large instances, with a dedicated broker node running on a separate i3.large instance.

For each test configuration and scenario, I ran 20 independent trials and discarded the high and low results (to exclude outliers), for 18 total data points. The key metric was the overall throughput of a single node during the test.

I compared the above results from:
* 2.3.1-SNAPSHOT (the current head of the 2.3 branch) - Eager protocol
* 2.4.0-SNAPSHOT (the current head of the 2.4 branch) - Cooperative protocol
* a modified 2.4.0-SNAPSHOT with cooperative rebalancing disabled - Eager protocol

What I found is that under all scenarios, all three versions performed the same (within a 99.9% significance threshold) under the same data sets and the same configurations.

I didn't see any marked improvement as a result of cooperative rebalancing alone, but this is only the foundation for several follow-on improvements. What is very good to know is that I also didn't find any regression as a result of the new protocol implementation.

> Benchmark cooperative vs eager rebalancing
> ------------------------------------------
>
>                 Key: KAFKA-8902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8902
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.4.0
>
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be interesting)
> Expand (from 9 to 10)
> Contract (from 10 to 9)
> With and without saturation:
> EOS:
> * with and without
> Topology:
> * stateful
> * windowed agg
> Key Parameterizations:
> 1. control: no rebalances
> 2. rolling without state loss faster than session timeout
> 3. expand 9 to 10
> 4. contract 10 to 9



--
This message was sent by Atlassian Jira
(v8.3.4#803005)