You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Stanislav Kozlovski (JIRA)" <ji...@apache.org> on 2018/10/16 12:39:00 UTC

[jira] [Updated] (KAFKA-7514) Trogdor - Support Multiple Threads in ConsumeBenchWorker

     [ https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stanislav Kozlovski updated KAFKA-7514:
---------------------------------------
    Description: 
Trogdor's ConsumeBenchWorker currently uses only two threads - one for the StatusUpdater:
{code:java}
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
        new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
{code}
and one for the consumer task itself
{code:java}
executor.submit(new ConsumeMessages(partitions));
{code}
A sample ConsumeBenchSpec specification in JSON looks like this:
{code:java}
{
    "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
    "durationMs": 10000000,
    "consumerNode": "node0",
    "bootstrapServers": "localhost:9092",
    "maxMessages": 100,
    "activeTopics": {
        "foo[1-3]": {
            "numPartitions": 3,
            "replicationFactor": 1
        }
    }
}
{code}
 

 
h2. Motivation

This does not make the best use of machines with multiple cores. It would be useful if there was a way to configure the ConsumeBenchSpec to use multiple threads and spawn multiple consumers. This would also allow the ConsumeBenchWorker to work with a higher amount of throughput due to the parallelism taking place.

 
h2.  

Proposal:

Add a new `consumerThreads` property to the ConsumeBenchSpec allowing you to run multiple consumers in parallel

Changes

 

By default, it will have a value of 1.
`activeTopics` will still be defined in the same way. They will be evenly assigned to the consumers in a round-robin fashion.
For example, if we have this configuration
{code:java}
{
    "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
    "durationMs": 10000000,
    "consumerNode": "node0",
    "bootstrapServers": "localhost:9092",
    "maxMessages": 100,
    "consumerThreads": 2,
    "activeTopics": {
        "foo[1-4]": {
            "numPartitions": 4,
            "replicationFactor": 1
        }
    }
}{code}
consumer thread 1 will be assigned partitions [foo1, foo3]
consumer thread 2 will be assigned partitions [foo2, foo4]

and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 for every consumer)
h3. Status

The way the worker's status will be updated as well. 
A ConsumeBenchWorker shows the following status when queried with `./bin/trogdor.sh client --show-tasks localhost:8889`

 
{code:java}
"tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
...
"status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
We will change it to show the status of every separate consumer and the topic partitions it was assigned to
{code:java}
"tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
...
"status":{  
   "consumer-1":{  
      "assignedPartitions":[  
         "foo1",
         "foo3"
      ],
      "totalMessagesReceived":190,
      "totalBytesReceived":98040,
      "averageMessageSizeBytes":516,
      "averageLatencyMs":449.0,
      "p50LatencyMs":449,
      "p95LatencyMs":449,
      "p99LatencyMs":449
   },
"consumer-2":{  
      "assignedPartitions":[  
         "foo2",
         "foo4"
      ],
      "totalMessagesReceived":190,
      "totalBytesReceived":98040,
      "averageMessageSizeBytes":516,
      "averageLatencyMs":449.0,
      "p50LatencyMs":449,
      "p95LatencyMs":449,
      "p99LatencyMs":449
   }
}


},{code}
 

 
h2.  

Backwards Compatibility:

This change should be mostly backwards-compatible. If the `consumerThreads` is not passed - only one consumer will be created and the round-robin assignor will assign every partition to it.

The only change will be in the format of the reported status. Even with one consumer, we will still show a status similar to
{code:java}
"status":{  
   "consumer-1":{  
      "assignedPartitions":[  
         "foo1",
         "foo3"
      ],
      "totalMessagesReceived":190,
      "totalBytesReceived":98040,
      "averageMessageSizeBytes":516,
      "averageLatencyMs":449.0,
      "p50LatencyMs":449,
      "p95LatencyMs":449,
      "p99LatencyMs":449
   }
}
{code}
 

  was:
Trogdor's ConsumeBenchWorker currently uses only two threads - one for the StatusUpdater:
{code:java}
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
        new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
{code}
and one for the consumer task itself
{code:java}
executor.submit(new ConsumeMessages(partitions));
{code}
A sample ConsumeBenchSpec specification in JSON looks like this:
{code:java}
{
    "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
    "durationMs": 10000000,
    "consumerNode": "node0",
    "bootstrapServers": "localhost:9092",
    "maxMessages": 100,
    "activeTopics": {
        "foo[1-3]": {
            "numPartitions": 3,
            "replicationFactor": 1
        }
    }
}
{code}
 

 
h2. Motivation


This does not make the best use of machines with multiple cores. It would be useful if there was a way to configure the ConsumeBenchSpec to use multiple threads and spawn multiple consumers. This would also allow the ConsumeBenchWorker to work with a higher amount of throughput due to the parallelism taking place.

 
h2. 
Proposal:

Add a new `consumerThreads` property to the ConsumeBenchSpec allowing you to run multiple consumers in parallel



Changes

 

By default, it will have a value of 1.
`activeTopics` will still be defined in the same way. They will be evenly assigned to the consumers in a round-robin fashion.
For example, if we have this configuration
{code:java}
{
    "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
    "durationMs": 10000000,
    "consumerNode": "node0",
    "bootstrapServers": "localhost:9092",
    "maxMessages": 100,
    "consumerThreads": 2,
    "activeTopics": {
        "foo[1-4]": {
            "numPartitions": 4,
            "replicationFactor": 1
        }
    }
}{code}
consumer thread 1 will be assigned partitions [foo1, foo3]
consumer thread 2 will be assigned partitions [foo2, foo4]

and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 for every consumer)




h3. Status

The way the worker's status will be updated as well. 
A ConsumeBenchWorker shows the following status when queried with `./bin/trogdor.sh client --show-tasks localhost:8889`

 
{code:java}
"tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
...
"status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
We will change it to show the status of every separate consumer and the topic partitions it was assigned to
{code:java}
"tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
...
"status" : { 
"consumer-1":
{
"assignedPartitions": ["foo1", "foo3"],
"totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 
}
"consumer-2":
{
"assignedPartitions": ["foo2", "foo4"],
"totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 
}
} 

},{code}
 

 
h2. 
Backwards Compatibility:

This change should be mostly backwards-compatible. If the `consumerThreads` is not passed - only one consumer will be created and the round-robin assignor will assign every partition to it.

The only change will be in the format of the reported status. Even with one consumer, we will still show a status similar to
{code:java}
"status" : { "consumer-1": { "assignedPartitions": ["foo1", "foo3"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 }
}
{code}
 


> Trogdor - Support Multiple Threads in ConsumeBenchWorker
> --------------------------------------------------------
>
>                 Key: KAFKA-7514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7514
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> Trogdor's ConsumeBenchWorker currently uses only two threads - one for the StatusUpdater:
> {code:java}
> this.statusUpdaterFuture = executor.scheduleAtFixedRate(
>         new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
> {code}
> and one for the consumer task itself
> {code:java}
> executor.submit(new ConsumeMessages(partitions));
> {code}
> A sample ConsumeBenchSpec specification in JSON looks like this:
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "activeTopics": {
>         "foo[1-3]": {
>             "numPartitions": 3,
>             "replicationFactor": 1
>         }
>     }
> }
> {code}
>  
>  
> h2. Motivation
> This does not make the best use of machines with multiple cores. It would be useful if there was a way to configure the ConsumeBenchSpec to use multiple threads and spawn multiple consumers. This would also allow the ConsumeBenchWorker to work with a higher amount of throughput due to the parallelism taking place.
>  
> h2.  
> Proposal:
> Add a new `consumerThreads` property to the ConsumeBenchSpec allowing you to run multiple consumers in parallel
> Changes
>  
> By default, it will have a value of 1.
> `activeTopics` will still be defined in the same way. They will be evenly assigned to the consumers in a round-robin fashion.
> For example, if we have this configuration
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "consumerThreads": 2,
>     "activeTopics": {
>         "foo[1-4]": {
>             "numPartitions": 4,
>             "replicationFactor": 1
>         }
>     }
> }{code}
> consumer thread 1 will be assigned partitions [foo1, foo3]
> consumer thread 2 will be assigned partitions [foo2, foo4]
> and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 for every consumer)
> h3. Status
> The way the worker's status will be updated as well. 
> A ConsumeBenchWorker shows the following status when queried with `./bin/trogdor.sh client --show-tasks localhost:8889`
>  
> {code:java}
> "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ...
> "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
> We will change it to show the status of every separate consumer and the topic partitions it was assigned to
> {code:java}
> "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ...
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    },
> "consumer-2":{  
>       "assignedPartitions":[  
>          "foo2",
>          "foo4"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> },{code}
>  
>  
> h2.  
> Backwards Compatibility:
> This change should be mostly backwards-compatible. If the `consumerThreads` is not passed - only one consumer will be created and the round-robin assignor will assign every partition to it.
> The only change will be in the format of the reported status. Even with one consumer, we will still show a status similar to
> {code:java}
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)