You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/04/18 22:52:50 UTC
[helix] branch master updated: Code cleanup and improvement with modern java syntax (#2449)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 55841c19b Code cleanup and improvement with modern java syntax (#2449)
55841c19b is described below
commit 55841c19bfb657f0bbdb01ef4d71e23753068f8a
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Tue Apr 18 18:52:44 2023 -0400
Code cleanup and improvement with modern java syntax (#2449)
Mostly the change is on replacing stream().forEach() with collection forEach()
---
.../helix/common/caches/CustomizedStateCache.java | 4 ++--
.../helix/common/caches/InstanceMessagesCache.java | 8 +++----
.../trimmer/HelixPropertyTrimmer.java | 10 ++++----
.../rebalancer/waged/WagedRebalancer.java | 6 ++---
.../constraints/ConstraintBasedAlgorithm.java | 7 +++---
.../rebalancer/waged/model/ClusterContext.java | 15 +++++-------
.../waged/model/ClusterModelProvider.java | 25 ++++++++++----------
.../controller/stages/CurrentStateOutput.java | 5 +---
.../stages/IntermediateStateCalcStage.java | 11 ++++-----
.../java/org/apache/helix/model/ClusterConfig.java | 27 ++++++++++------------
.../org/apache/helix/model/InstanceConfig.java | 11 ++++-----
.../zookeeper/zkclient/metric/ZkClientMonitor.java | 2 +-
12 files changed, 58 insertions(+), 73 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
index ba7895368..f79c502a1 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
@@ -54,8 +54,8 @@ public class CustomizedStateCache extends ParticipantStateCache<CustomizedState>
for (String instanceName : liveInstanceMap.keySet()) {
for (String customizedStateType : _aggregationEnabledTypes) {
accessor.getChildNames(keyBuilder.customizedStates(instanceName, customizedStateType))
- .stream().forEach(resourceName -> participantStateKeys
- .add(keyBuilder.customizedState(instanceName, customizedStateType, resourceName)));
+ .forEach(resourceName ->
+ participantStateKeys.add(keyBuilder.customizedState(instanceName, customizedStateType, resourceName)));
}
}
return participantStateKeys;
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index fbd025ea8..c06837349 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -554,10 +554,10 @@ public class InstanceMessagesCache {
}
}
- toRemoveMessages.entrySet().stream().forEach(entry -> {
- entry.getValue().stream().forEach(id -> _staleMessageCache.get(entry.getKey()).remove(id));
- if (_staleMessageCache.get(entry.getKey()).size() == 0) {
- _staleMessageCache.remove(entry.getKey());
+ toRemoveMessages.forEach((key, value) -> {
+ value.forEach(id -> _staleMessageCache.get(key).remove(id));
+ if (_staleMessageCache.get(key).size() == 0) {
+ _staleMessageCache.remove(key);
}
});
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/HelixPropertyTrimmer.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/HelixPropertyTrimmer.java
index dbab09ccc..35ef1d41b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/HelixPropertyTrimmer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/HelixPropertyTrimmer.java
@@ -107,7 +107,7 @@ public abstract class HelixPropertyTrimmer<T extends HelixProperty> {
}
switch (fieldType) {
case SIMPLE_FIELD:
- fieldKeySet.stream().forEach(fieldKey -> {
+ fieldKeySet.forEach(fieldKey -> {
if (originalZNRecord.getSimpleFields().containsKey(fieldKey)) {
trimmedZNRecord.getSimpleFields().putIfAbsent(fieldKey,
trimValue ? null : originalZNRecord.getSimpleField(fieldKey));
@@ -115,18 +115,18 @@ public abstract class HelixPropertyTrimmer<T extends HelixProperty> {
});
break;
case LIST_FIELD:
- fieldKeySet.stream().forEach(fieldKey -> {
+ fieldKeySet.forEach(fieldKey -> {
if (originalZNRecord.getListFields().containsKey(fieldKey)) {
trimmedZNRecord.getListFields().putIfAbsent(fieldKey,
- trimValue ? Collections.EMPTY_LIST : originalZNRecord.getListField(fieldKey));
+ trimValue ? Collections.emptyList() : originalZNRecord.getListField(fieldKey));
}
});
break;
case MAP_FIELD:
- fieldKeySet.stream().forEach(fieldKey -> {
+ fieldKeySet.forEach(fieldKey -> {
if (originalZNRecord.getMapFields().containsKey(fieldKey)) {
trimmedZNRecord.getMapFields().putIfAbsent(fieldKey,
- trimValue ? Collections.EMPTY_MAP : originalZNRecord.getMapField(fieldKey));
+ trimValue ? Collections.emptyMap() : originalZNRecord.getMapField(fieldKey));
}
});
break;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index b43ce4b68..494baef01 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -79,9 +78,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
ConstraintBasedAlgorithmFactory
.getInstance(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
// These failure types should be propagated to caller of computeNewIdealStates()
- private static final List<HelixRebalanceException.Type> FAILURE_TYPES_TO_PROPAGATE = Collections
- .unmodifiableList(Arrays.asList(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS,
- HelixRebalanceException.Type.UNKNOWN_FAILURE));
+ private static final List<HelixRebalanceException.Type> FAILURE_TYPES_TO_PROPAGATE =
+ List.of(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, HelixRebalanceException.Type.UNKNOWN_FAILURE);
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 9065869cd..d7dffaa4d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -91,8 +91,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
// Create a wrapper for each AssignableReplica.
List<AssignableReplicaWithScore> toBeAssignedReplicas =
clusterModel.getAssignableReplicaMap().values().stream().flatMap(Collection::stream).map(
- replica -> new AssignableReplicaWithScore(replica, clusterModel,
- positiveEstimateClusterRemainCap)).sorted()
+ replica -> new AssignableReplicaWithScore(replica, clusterModel, positiveEstimateClusterRemainCap)).sorted()
.collect(Collectors.toList());
for (AssignableReplicaWithScore replicaWithScore : toBeAssignedReplicas) {
@@ -101,7 +100,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
optimalAssignment);
// stop immediately if any replica cannot find best assignable node
- if (!maybeBestNode.isPresent() || optimalAssignment.hasAnyFailure()) {
+ if (maybeBestNode.isEmpty() || optimalAssignment.hasAnyFailure()) {
String errorMessage = String.format(
"Unable to find any available candidate node for partition %s; Fail reasons: %s",
replica.getPartitionName(), optimalAssignment.getFailures());
@@ -180,7 +179,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
.collect(Collectors.toList());
}
- private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+ private static class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
private final AssignableReplica _replica;
private float _score = 0;
private final boolean _isInBestPossibleAssignment;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 24916e98d..07e33df16 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -84,18 +84,15 @@ public class ClusterContext {
for (AssignableReplica replica : entry.getValue()) {
if (replica.isReplicaTopState()) {
totalTopStateReplicas += 1;
- replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
- .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
- : (v + capacityEntry.getValue())));
+ replica.getCapacity().forEach(
+ (key, value) -> totalTopStateUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
}
- replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
- .compute(capacityEntry.getKey(),
- (k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue())));
+ replica.getCapacity().forEach(
+ (key, value) -> totalUsage.compute(key, (k, v) -> (v == null) ? value : (v + value)));
}
}
- nodeSet.stream().forEach(node -> node.getMaxCapacity().entrySet().stream().forEach(
- capacityEntry -> totalCapacity.compute(capacityEntry.getKey(),
- (k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue()))));
+ nodeSet.forEach(node -> node.getMaxCapacity().forEach(
+ (key, value) -> totalCapacity.compute(key, (k, v) -> (v == null) ? value : (v + value))));
// TODO: these variables correspond to one constraint each, and may become unnecessary if the
// constraints are not used. A better design is to make them pluggable.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 7dab730e8..dfc648aa7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -36,6 +36,7 @@ import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
@@ -117,7 +118,7 @@ public class ClusterModelProvider {
* @param baselineAssignment The persisted Baseline assignment.
* @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
* previous rebalance.
- * @return
+ * @return the new cluster model
*/
public static ClusterModel generateClusterModelForPartialRebalance(
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
@@ -498,8 +499,8 @@ public class ClusterModelProvider {
private static Map<String, Map<String, Set<String>>> getValidStateInstanceMap(
ResourceAssignment assignment, Set<String> activeInstances) {
Map<String, Map<String, Set<String>>> stateInstanceMap = getStateInstanceMap(assignment);
- stateInstanceMap.values().stream().forEach(stateMap -> stateMap.values().stream()
- .forEach(instanceSet -> instanceSet.retainAll(activeInstances)));
+ stateInstanceMap.values().forEach(stateMap ->
+ stateMap.values().forEach(instanceSet -> instanceSet.retainAll(activeInstances)));
return stateInstanceMap;
}
@@ -510,12 +511,10 @@ public class ClusterModelProvider {
return Collections.emptyMap();
}
return assignment.getMappedPartitions().stream()
- .collect(Collectors.toMap(partition -> partition.getPartitionName(), partition -> {
+ .collect(Collectors.toMap(Partition::getPartitionName, partition -> {
Map<String, Set<String>> stateInstanceMap = new HashMap<>();
- assignment.getReplicaMap(partition).entrySet().stream().forEach(
- stateMapEntry -> stateInstanceMap
- .computeIfAbsent(stateMapEntry.getValue(), key -> new HashSet<>())
- .add(stateMapEntry.getKey()));
+ assignment.getReplicaMap(partition)
+ .forEach((key1, value) -> stateInstanceMap.computeIfAbsent(value, key -> new HashSet<>()).add(key1));
return stateInstanceMap;
}));
}
@@ -532,7 +531,7 @@ public class ClusterModelProvider {
private static Set<AssignableNode> getAllAssignableNodes(ClusterConfig clusterConfig,
Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
return activeInstances.parallelStream()
- .filter(instance -> instanceConfigMap.containsKey(instance)).map(
+ .filter(instanceConfigMap::containsKey).map(
instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
instanceName)).collect(Collectors.toSet());
}
@@ -549,7 +548,7 @@ public class ClusterModelProvider {
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
Set<AssignableNode> assignableNodes) {
ClusterConfig clusterConfig = dataProvider.getClusterConfig();
- int activeFaultZoneCount = assignableNodes.stream().map(node -> node.getFaultZone())
+ int activeFaultZoneCount = assignableNodes.stream().map(AssignableNode::getFaultZone)
.collect(Collectors.toSet()).size();
return resourceMap.keySet().parallelStream().map(resourceName -> {
ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName);
@@ -582,8 +581,8 @@ public class ClusterModelProvider {
}
}
}
- return new HashMap.SimpleEntry<>(resourceName, replicas);
- }).collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
+ return Map.entry(resourceName, replicas);
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
@@ -592,7 +591,7 @@ public class ClusterModelProvider {
private static Map<String, Map<String, Set<String>>> mapAssignmentToFaultZone(
Set<AssignableNode> assignableNodes) {
Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new HashMap<>();
- assignableNodes.stream().forEach(node -> {
+ assignableNodes.forEach(node -> {
for (Map.Entry<String, Set<String>> resourceMap : node.getAssignedPartitionsMap()
.entrySet()) {
faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new HashMap<>())
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index a81fd2cd4..0ef47c8e7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -453,10 +453,7 @@ public class CurrentStateOutput {
getCurrentStateMap(resourceName);
if (!currentStateMap.isEmpty()) {
ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
- currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
- newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
- currentStateEntry.getValue());
- });
+ currentStateMap.forEach(newResourceAssignment::addReplicaMap);
currentStateAssignment.put(resourceName, newResourceAssignment);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index d16d6df4c..dbd433b3e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -865,18 +865,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
Map<Partition, Map<String, Message>> pendingMessageMap,
Map<Partition, List<Message>> resourceMessageMap) {
for (Map.Entry<Partition, Map<String, Message>> entry : pendingMessageMap.entrySet()) {
- entry.getValue().entrySet().stream().forEach(e -> {
- if (!e.getValue().getToState().equals(HelixDefinedState.DROPPED.name())) {
- intermediateStateMap
- .setState(entry.getKey(), e.getValue().getTgtName(), e.getValue().getToState());
+ entry.getValue().forEach((key, value) -> {
+ if (!value.getToState().equals(HelixDefinedState.DROPPED.name())) {
+ intermediateStateMap.setState(entry.getKey(), value.getTgtName(), value.getToState());
} else {
- intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getValue().getTgtName());
+ intermediateStateMap.getStateMap().get(entry.getKey()).remove(value.getTgtName());
}
});
}
for (Map.Entry<Partition, List<Message>> entry : resourceMessageMap.entrySet()) {
- entry.getValue().stream().forEach(e -> {
+ entry.getValue().forEach(e -> {
if (!e.getToState().equals(HelixDefinedState.DROPPED.name())) {
intermediateStateMap.setState(entry.getKey(), e.getTgtName(), e.getToState());
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index c4f9f914f..8f04c5fab 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -934,7 +934,7 @@ public class ClusterConfig extends HelixProperty {
Map<String, String> capacityData = _record.getMapField(capacityPropertyType.name());
if (capacityData != null) {
return capacityData.entrySet().stream().collect(
- Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
+ Collectors.toMap(Map.Entry::getKey, entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}
@@ -945,13 +945,12 @@ public class ClusterConfig extends HelixProperty {
_record.getMapFields().remove(capacityPropertyType.name());
} else {
Map<String, String> data = new HashMap<>();
- capacityDataMap.entrySet().stream().forEach(entry -> {
- if (entry.getValue() < 0) {
- throw new IllegalArgumentException(String
- .format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
- entry.getValue()));
+ capacityDataMap.forEach((key, value) -> {
+ if (value < 0) {
+ throw new IllegalArgumentException(
+ String.format("Default capacity data contains a negative value: %s = %d", key, value));
}
- data.put(entry.getKey(), Integer.toString(entry.getValue()));
+ data.put(key, Integer.toString(value));
});
_record.setMapField(capacityPropertyType.name(), data);
}
@@ -976,14 +975,12 @@ public class ClusterConfig extends HelixProperty {
+ "GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not specified");
}
Map<String, String> preferenceMap = new HashMap<>();
- preference.entrySet().stream().forEach(entry -> {
- if (entry.getValue() > MAX_REBALANCE_PREFERENCE
- || entry.getValue() < MIN_REBALANCE_PREFERENCE) {
- throw new IllegalArgumentException(String
- .format("Invalid global rebalance preference configuration. Key %s, Value %d.",
- entry.getKey().name(), entry.getValue()));
+ preference.forEach((key, value) -> {
+ if (value > MAX_REBALANCE_PREFERENCE || value < MIN_REBALANCE_PREFERENCE) {
+ throw new IllegalArgumentException(
+ String.format("Invalid global rebalance preference configuration. Key %s, Value %d.", key.name(), value));
}
- preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
+ preferenceMap.put(key.name(), Integer.toString(value));
});
_record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
}
@@ -1100,7 +1097,7 @@ public class ClusterConfig extends HelixProperty {
public Map<String, String> getAbnormalStateResolverMap() {
Map<String, String> resolverMap =
_record.getMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
- return resolverMap == null ? Collections.EMPTY_MAP : resolverMap;
+ return resolverMap == null ? Collections.emptyMap() : resolverMap;
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index da5e42460..01c1f682f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -626,13 +626,12 @@ public class InstanceConfig extends HelixProperty {
_record.getMapFields().remove(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name());
} else {
Map<String, String> capacityData = new HashMap<>();
- capacityDataMap.entrySet().stream().forEach(entry -> {
- if (entry.getValue() < 0) {
- throw new IllegalArgumentException(String
- .format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
- entry.getValue()));
+ capacityDataMap.forEach((key, value) -> {
+ if (value < 0) {
+ throw new IllegalArgumentException(
+ String.format("Capacity Data contains a negative value: %s = %d", key, value));
}
- capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
+ capacityData.put(key, Integer.toString(value));
});
_record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index d0a37bb6e..13231c842 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -134,7 +134,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
}
doRegister(attributeList, MBEAN_DESCRIPTION,
getObjectName(_monitorType, _monitorKey, _monitorInstanceName));
- _zkClientPathMonitorMap.values().stream().forEach( monitor -> {
+ _zkClientPathMonitorMap.values().forEach(monitor -> {
if (monitor != null) {
try {
monitor.register();