You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/09 20:16:58 UTC
[kafka] branch trunk updated: MINOR: Remove redundant `if`
condition. (#5697)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4be79b6 MINOR: Remove redundant `if` condition. (#5697)
4be79b6 is described below
commit 4be79b6ceef8310ef8ab78f274f889ced3ab2532
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Wed Oct 10 01:46:49 2018 +0530
MINOR: Remove redundant `if` condition. (#5697)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../internals/assignment/StickyTaskAssignor.java | 29 +++++++++-------------
1 file changed, 12 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 7b33813..69c2ba2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -117,7 +117,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
- final ClientState client = findClient(taskId, clientsWithin, active);
+ final ClientState client = findClient(taskId, clientsWithin);
taskPairs.addPairs(taskId, client.assignedTasks());
client.assign(taskId, active);
}
@@ -139,7 +139,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
- private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
+ private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin) {
// optimize the case where there is only 1 id to search within.
if (clientsWithin.size() == 1) {
@@ -148,14 +148,14 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
final ClientState previous = findClientsWithPreviousAssignedTask(taskId, clientsWithin);
if (previous == null) {
- return leastLoaded(taskId, clientsWithin, active);
+ return leastLoaded(taskId, clientsWithin);
}
if (shouldBalanceLoad(previous)) {
final ClientState standby = findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
if (standby == null
|| shouldBalanceLoad(standby)) {
- return leastLoaded(taskId, clientsWithin, active);
+ return leastLoaded(taskId, clientsWithin);
}
return standby;
}
@@ -192,21 +192,20 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
final HashSet<ID> constrainTo = new HashSet<>(ids);
constrainTo.retainAll(clientsWithin);
- return leastLoaded(taskId, constrainTo, false);
+ return leastLoaded(taskId, constrainTo);
}
- private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds, final boolean active) {
- final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true, active);
+ private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
+ final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
if (leastLoaded == null) {
- return findLeastLoaded(taskId, clientIds, false, active);
+ return findLeastLoaded(taskId, clientIds, false);
}
return leastLoaded;
}
private ClientState findLeastLoaded(final TaskId taskId,
final Set<ID> clientIds,
- final boolean checkTaskPairs,
- final boolean active) {
+ final boolean checkTaskPairs) {
ClientState leastLoaded = null;
for (final ID id : clientIds) {
final ClientState client = clients.get(id);
@@ -217,7 +216,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
if (leastLoaded == null || client.hasMoreAvailableCapacityThan(leastLoaded)) {
if (!checkTaskPairs) {
leastLoaded = client;
- } else if (taskPairs.hasNewPair(taskId, client.assignedTasks(), active)) {
+ } else if (taskPairs.hasNewPair(taskId, client.assignedTasks())) {
leastLoaded = client;
}
}
@@ -235,7 +234,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
for (final TaskId prevAssignedTask : clientState.getValue().previousStandbyTasks()) {
if (!previousStandbyTaskAssignment.containsKey(prevAssignedTask)) {
- previousStandbyTaskAssignment.put(prevAssignedTask, new HashSet<ID>());
+ previousStandbyTaskAssignment.put(prevAssignedTask, new HashSet<>());
}
previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
}
@@ -261,15 +260,11 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
boolean hasNewPair(final TaskId task1,
- final Set<TaskId> taskIds,
- final boolean active) {
+ final Set<TaskId> taskIds) {
if (pairs.size() == maxPairs) {
return false;
}
for (final TaskId taskId : taskIds) {
- if (!active && !pairs.contains(pair(task1, taskId))) {
- return true;
- }
if (!pairs.contains(pair(task1, taskId))) {
return true;
}