You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/29 22:00:29 UTC
kafka git commit: KAFKA-4843: More efficient round-robin scheduler
Repository: kafka
Updated Branches:
refs/heads/trunk 6feaa8a58 -> 84a14fec2
KAFKA-4843: More efficient round-robin scheduler
- Improves streams efficiency by more than 200K requests/second (small 100 byte requests)
- Gets streams efficiency very close to pure consumer (see results in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/746/console)
- Maintains same fairness across tasks
- Schedules all records in the queue in-between poll() calls, not just one per task.
Author: Eno Thereska <en...@confluent.io>
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes #2643 from enothereska/minor-schedule-round-robin
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84a14fec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84a14fec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84a14fec
Branch: refs/heads/trunk
Commit: 84a14fec299749a208251bce1a0eb9c1a8241d08
Parents: 6feaa8a
Author: Eno Thereska <en...@confluent.io>
Authored: Wed Mar 29 15:00:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 29 15:00:26 2017 -0700
----------------------------------------------------------------------
.../streams/processor/internals/StreamTask.java | 17 +-
.../processor/internals/StreamThread.java | 225 ++++++++++++-------
.../processor/internals/StreamTaskTest.java | 45 ++--
.../streams/streams_simple_benchmark_test.py | 4 +-
tests/kafkatest/services/streams.py | 2 +-
5 files changed, 188 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7bd4be4..092d6e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -169,19 +169,26 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
/**
- * Process one record
+ * @return The number of records left in the buffer of this task's partition group
+ */
+ public int numBuffered() {
+ return partitionGroup.numBuffered();
+ }
+
+ /**
+ * Process one record.
*
- * @return number of records left in the buffer of this task's partition group after the processing is done
+ * @return true if this method processes a record, false if it does not process a record.
*/
@SuppressWarnings("unchecked")
- public int process() {
+ public boolean process() {
// get the next record to process
StampedRecord record = partitionGroup.nextRecord(recordInfo);
// if there is no record to process, return immediately
if (record == null) {
requiresPoll = true;
- return 0;
+ return false;
}
requiresPoll = false;
@@ -224,7 +231,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
processorContext.setCurrentNode(null);
}
- return partitionGroup.numBuffered();
+ return true;
}
private void updateProcessorContext(final ProcessorRecordContext recordContext, final ProcessorNode currNode) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 61d7d72..b90bde5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -223,6 +223,7 @@ public class StreamThread extends Thread {
private final TaskCreator taskCreator = new TaskCreator();
final ConsumerRebalanceListener rebalanceListener;
+ private final static int UNLIMITED_RECORDS = -1;
public synchronized boolean isInitialized() {
return state == State.RUNNING;
@@ -519,107 +520,168 @@ public class StreamThread extends Thread {
return Math.max(this.timerStartedMs - previousTimeMs, 0);
}
- private void runLoop() {
- int totalNumBuffered = 0;
- boolean requiresPoll = true;
- boolean polledRecords = false;
-
- consumer.subscribe(sourceTopicPattern, rebalanceListener);
-
- while (stillRunning()) {
- this.timerStartedMs = time.milliseconds();
-
- // try to fetch some records if necessary
- if (requiresPoll) {
- requiresPoll = false;
-
- boolean longPoll = totalNumBuffered == 0;
+ /**
+ * Get the next batch of records by polling.
+ * @return Next batch of records or null if no records available.
+ */
+ private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+ ConsumerRecords<byte[], byte[]> records = null;
- ConsumerRecords<byte[], byte[]> records = null;
+ try {
+ records = consumer.poll(pollTimeMs);
+ } catch (NoOffsetForPartitionException ex) {
+ TopicPartition partition = ex.partition();
+ if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+ log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
+ consumer.seekToBeginning(ex.partitions());
+ } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+ consumer.seekToEnd(ex.partitions());
+ log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
+ } else {
- try {
- records = consumer.poll(longPoll ? this.pollTimeMs : 0);
- } catch (NoOffsetForPartitionException ex) {
- TopicPartition partition = ex.partition();
- if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
- log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
- consumer.seekToBeginning(ex.partitions());
- } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
- consumer.seekToEnd(ex.partitions());
- log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
- } else {
+ if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
+ setState(State.PENDING_SHUTDOWN);
+ String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
+ " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
+ "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
+ throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
+ }
- if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
- setState(State.PENDING_SHUTDOWN);
- String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
- " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
- "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
- throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
- }
+ if (originalReset.equals("earliest")) {
+ consumer.seekToBeginning(ex.partitions());
+ } else if (originalReset.equals("latest")) {
+ consumer.seekToEnd(ex.partitions());
+ }
+ log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
+ }
- if (originalReset.equals("earliest")) {
- consumer.seekToBeginning(ex.partitions());
- } else if (originalReset.equals("latest")) {
- consumer.seekToEnd(ex.partitions());
- }
- log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
- }
+ }
- }
+ if (rebalanceException != null)
+ throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
- if (rebalanceException != null)
- throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
+ return records;
+ }
- if (records != null && !records.isEmpty()) {
- int numAddedRecords = 0;
+ /**
+ * Take records and add them to each respective task
+ * @param records Records, can be null
+ */
+ private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
+ if (records != null && !records.isEmpty()) {
+ int numAddedRecords = 0;
- for (TopicPartition partition : records.partitions()) {
- StreamTask task = activeTasksByPartition.get(partition);
- numAddedRecords += task.addRecords(partition, records.records(partition));
- }
- streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
- polledRecords = true;
- } else {
- polledRecords = false;
- }
+ for (TopicPartition partition : records.partitions()) {
+ StreamTask task = activeTasksByPartition.get(partition);
+ numAddedRecords += task.addRecords(partition, records.records(partition));
+ }
+ streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
+ }
+ }
- // only record poll latency is long poll is required
- if (longPoll) {
- streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
+ /**
+ * Schedule the records processing by selecting which record is processed next. Commits may
+ * happen as records are processed.
+ * @tasks The tasks that have records.
+ * @param recordsProcessedBeforeCommit number of records to be processed before commit is called.
+ * if UNLIMITED_RECORDS, then commit is never called
+ * @return Number of records processed since last commit.
+ */
+ private long processAndPunctuate(final Map<TaskId, StreamTask> tasks,
+ final long recordsProcessedBeforeCommit) {
+
+ long totalProcessedEachRound;
+ long totalProcessedSinceLastMaybeCommit = 0;
+ // Round-robin scheduling by taking one record from each task repeatedly
+ // until no task has any records left
+ do {
+ totalProcessedEachRound = 0;
+ for (StreamTask task : tasks.values()) {
+ // we processed one record,
+ // and more are buffered waiting for the next round
+ if (task.process()) {
+ totalProcessedEachRound++;
+ totalProcessedSinceLastMaybeCommit++;
}
}
+ if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS &&
+ totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) {
+ totalProcessedSinceLastMaybeCommit = 0;
+ long processLatency = computeLatency();
+ streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastMaybeCommit,
+ timerStartedMs);
+ maybeCommit(this.timerStartedMs);
+ }
+ } while (totalProcessedEachRound != 0);
- // try to process one fetch record from each task via the topology, and also trigger punctuate
- // functions if necessary, which may result in more records going through the topology in this loop
- if (totalNumBuffered > 0 || polledRecords) {
- totalNumBuffered = 0;
-
- if (!activeTasks.isEmpty()) {
- for (StreamTask task : activeTasks.values()) {
+ // go over the tasks again to punctuate or commit
+ for (StreamTask task : tasks.values()) {
+ maybePunctuate(task);
+ if (task.commitNeeded())
+ commitOne(task);
+ }
- totalNumBuffered += task.process();
+ return totalProcessedSinceLastMaybeCommit;
+ }
- requiresPoll = requiresPoll || task.requiresPoll();
+ /**
+ * Adjust the number of records that should be processed by scheduler. This avoids
+ * scenarios where the processing time is higher than the commit time.
+ * @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler.
+ * @param totalProcessed Total number of records processed in this last round.
+ * @param processLatency Total processing latency in ms processed in this last round.
+ * @param commitTime Desired commit time in ms.
+ * @return An adjusted number of records to be processed in the next round.
+ */
+ private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed,
+ final long processLatency, final long commitTime) {
+ long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
+ // check if process latency larger than commit latency
+ // note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so
+ // we will never process all records again. This might be an issue if the initial measurement
+ // was off due to a slow start.
+ if (processLatency > commitTime) {
+ // push down
+ recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
+ log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
+ logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+ } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) {
+ // push up
+ recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
+ log.debug("{} processing latency {} > commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}",
+ logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+ }
- streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs);
+ return recordsProcessedBeforeCommit;
+ }
- maybePunctuate(task);
+ /**
+ * Main event loop for polling, and processing records through topologies.
+ */
+ private void runLoop() {
+ long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
+ consumer.subscribe(sourceTopicPattern, rebalanceListener);
- if (task.commitNeeded())
- commitOne(task);
- }
+ while (stillRunning()) {
+ this.timerStartedMs = time.milliseconds();
- } else {
- // even when no task is assigned, we must poll to get a task.
- requiresPoll = true;
+ // try to fetch some records if necessary
+ ConsumerRecords<byte[], byte[]> records = pollRequests(this.pollTimeMs);
+ if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
+ streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
+ addRecordsToTasks(records);
+ final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit);
+ if (totalProcessed > 0) {
+ final long processLatency = computeLatency();
+ streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
+ timerStartedMs);
+ recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
+ processLatency, commitTimeMs);
}
-
- } else {
- requiresPoll = true;
}
+
maybeCommit(timerStartedMs);
maybeUpdateStandbyTasks();
-
maybeClean(timerStartedMs);
}
log.info("{} Shutting down at user request", logPrefix);
@@ -692,8 +754,9 @@ public class StreamThread extends Thread {
protected void maybeCommit(final long now) {
if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
- log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed",
- logPrefix, commitTimeMs, activeTasks, standbyTasks);
+
+ log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed by {}ms",
+ logPrefix, activeTasks, standbyTasks, commitTimeMs, now - lastCommitMs);
commitAll();
lastCommitMs = now;
http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c1dce59..7c9f46b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -172,27 +172,33 @@ public class StreamTaskTest {
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
- assertEquals(5, task.process());
+ assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
- assertEquals(4, task.process());
+ assertTrue(task.process());
+ assertEquals(4, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(0, source2.numReceived);
- assertEquals(3, task.process());
+ assertTrue(task.process());
+ assertEquals(3, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
- assertEquals(2, task.process());
+ assertTrue(task.process());
+ assertEquals(2, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(1, source2.numReceived);
- assertEquals(1, task.process());
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
- assertEquals(0, task.process());
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
}
@@ -234,7 +240,7 @@ public class StreamTaskTest {
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
- assertEquals(5, task.process());
+ assertTrue(task.process());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
@@ -251,21 +257,21 @@ public class StreamTaskTest {
assertTrue(consumer.paused().contains(partition1));
assertTrue(consumer.paused().contains(partition2));
- assertEquals(7, task.process());
+ assertTrue(task.process());
assertEquals(2, source1.numReceived);
assertEquals(0, source2.numReceived);
assertEquals(1, consumer.paused().size());
assertTrue(consumer.paused().contains(partition2));
- assertEquals(6, task.process());
+ assertTrue(task.process());
assertEquals(3, source1.numReceived);
assertEquals(0, source2.numReceived);
assertEquals(1, consumer.paused().size());
assertTrue(consumer.paused().contains(partition2));
- assertEquals(5, task.process());
+ assertTrue(task.process());
assertEquals(3, source1.numReceived);
assertEquals(1, source2.numReceived);
@@ -289,40 +295,47 @@ public class StreamTaskTest {
assertTrue(task.maybePunctuate());
- assertEquals(5, task.process());
+ assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
assertFalse(task.maybePunctuate());
- assertEquals(4, task.process());
+ assertTrue(task.process());
+ assertEquals(4, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(1, source2.numReceived);
assertTrue(task.maybePunctuate());
- assertEquals(3, task.process());
+ assertTrue(task.process());
+ assertEquals(3, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
assertFalse(task.maybePunctuate());
- assertEquals(2, task.process());
+ assertTrue(task.process());
+ assertEquals(2, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(2, source2.numReceived);
assertTrue(task.maybePunctuate());
- assertEquals(1, task.process());
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
assertFalse(task.maybePunctuate());
- assertEquals(0, task.process());
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
+ assertFalse(task.process());
assertFalse(task.maybePunctuate());
processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index c9f970e..9f0e457 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -30,12 +30,12 @@ class StreamsSimpleBenchmarkTest(Test):
def __init__(self, test_context):
super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
- self.num_records = 10000000L
+ self.num_records = 20000000L
self.replication = 1
@cluster(num_nodes=9)
- @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3])
+ @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 4f8f1a3..e7be947 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.start_node(node)
- def wait(self, timeout_sec=720):
+ def wait(self, timeout_sec=1440):
for node in self.nodes:
self.wait_node(node, timeout_sec)