You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/08/03 21:28:14 UTC
[kafka] branch 2.1 updated: MINOR: Avoid dividing by zero (#7143)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d00e741 MINOR: Avoid dividing by zero (#7143)
d00e741 is described below
commit d00e74106c60d9eb2d185e6141b99266a1f9d79f
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat Aug 3 14:03:15 2019 -0700
MINOR: Avoid dividing by zero (#7143)
Reviews: A. Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Boyang Chen <bo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/processor/internals/StreamThread.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ac0f5ef..6c99797 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1038,7 +1038,7 @@ public class StreamThread extends Thread {
* or if the task producer got fenced (EOS)
*/
boolean maybeCommit() {
- int committed = 0;
+ final int committed;
if (commitTimeMs >= 0 && now - lastCommitMs > commitTimeMs) {
if (log.isTraceEnabled()) {
@@ -1046,7 +1046,7 @@ public class StreamThread extends Thread {
taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
}
- committed += taskManager.commitAll();
+ committed = taskManager.commitAll();
if (committed > 0) {
final long intervalCommitLatency = advanceNowAndComputeLatency();
streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now);
@@ -1063,11 +1063,10 @@ public class StreamThread extends Thread {
lastCommitMs = now;
processStandbyRecords = true;
} else {
- final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested();
- if (commitPerRequested > 0) {
+ committed = taskManager.maybeCommitActiveTasksPerUserRequested();
+ if (committed > 0) {
final long requestCommitLatency = advanceNowAndComputeLatency();
streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now);
- committed += commitPerRequested;
}
}