You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by jo...@apache.org on 2018/07/12 03:19:15 UTC
[incubator-druid] branch master updated: Coordinator fix balancer
stuck (#5987)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 31c2179 Coordinator fix balancer stuck (#5987)
31c2179 is described below
commit 31c2179fe1daa564e918f4ba64937743a45132a1
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Wed Jul 11 20:19:11 2018 -0700
Coordinator fix balancer stuck (#5987)
* this will fix it
* filter destinations to not consider servers already serving segment
* fix it
* cleanup
* fix opposite day in ImmutableDruidServer.equals
* simplify
---
.../java/io/druid/client/ImmutableDruidServer.java | 6 +--
.../helper/DruidCoordinatorBalancer.java | 56 ++++++++++++++++------
2 files changed, 43 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
index ef4fc66..22d5e71 100644
--- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
@@ -142,11 +142,7 @@ public class ImmutableDruidServer
ImmutableDruidServer that = (ImmutableDruidServer) o;
- if (metadata.equals(that.metadata)) {
- return false;
- }
-
- return true;
+ return metadata.equals(that.metadata);
}
@Override
diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 23f8dcb..8547954 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -93,9 +93,11 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
CoordinatorStats stats
)
{
- final BalancerStrategy strategy = params.getBalancerStrategy();
- final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+ if (params.getAvailableSegments().size() == 0) {
+ log.info("Metadata segments are not available. Cannot balance.");
+ return;
+ }
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) {
@@ -117,33 +119,59 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
numSegments += sourceHolder.getServer().getSegments().size();
}
+
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}
+ final BalancerStrategy strategy = params.getBalancerStrategy();
+ final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
+ final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
- for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
- final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
- if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
- final List<ServerHolder> toMoveToWithLoadQueueCapacity =
+ for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
+ final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
+
+ if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+ final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
+ final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
+ // we want to leave the server the segment is currently on in the list...
+ // but filter out replicas that are already serving the segment, and servers with a full load queue
+ final List<ServerHolder> toMoveToWithLoadQueueCapacityAndNotServingSegment =
toMoveTo.stream()
- .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)
+ .filter(s -> s.getServer().equals(fromServer) ||
+ (!s.isServingSegment(segmentToMove) &&
+ (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)))
.collect(Collectors.toList());
- final ServerHolder destinationHolder =
- strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity);
-
- if (destinationHolder != null) {
- moveSegment(segmentToMove, destinationHolder.getServer(), params);
- moved++;
+ if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
+ final ServerHolder destinationHolder =
+ strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment);
+
+ if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) {
+ moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params);
+ moved++;
+ } else {
+ log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier());
+ unmoved++;
+ }
} else {
- log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier());
+ log.info(
+ "No valid movement destinations for segment [%s].",
+ segmentToMove.getIdentifier()
+ );
unmoved++;
}
}
+ if (iter >= maxIterations) {
+ log.info(
+ "Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.",
+ (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+ );
+ break;
+ }
}
if (unmoved == maxSegmentsToMove) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org