You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/05/01 17:37:05 UTC
[incubator-pinot] branch master updated: Minor improvements as
encountered while studying replica groups (#4180)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e5029b6 Minor improvements as encountered while studying replica groups (#4180)
e5029b6 is described below
commit e5029b6506ef18953dfa8b390ca07c3eb9ea8a6d
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed May 1 10:36:59 2019 -0700
Minor improvements as encountered while studying replica groups (#4180)
---
.../broker/routing/RoutingTableBuilderFactory.java | 2 +-
.../builder/GeneratorBasedRoutingTableBuilder.java | 22 +++++-----------------
.../PartitionAwareOfflineRoutingTableBuilder.java | 9 +++------
.../common/partition/PartitionAssignment.java | 4 ++--
.../partition/ReplicaGroupPartitionAssignment.java | 4 ++--
.../org/apache/pinot/common/utils/LLCUtils.java | 11 ++---------
.../ReplicaGroupRebalanceSegmentStrategy.java | 4 ++--
.../ReplicaGroupSegmentAssignmentStrategy.java | 2 +-
.../controller/utils/ReplicaGroupTestUtils.java | 4 ++--
9 files changed, 20 insertions(+), 42 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
index 8fb0ccf..574ccf5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
@@ -116,7 +116,7 @@ public class RoutingTableBuilderFactory {
== CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy);
// Check that replica group strategy config is correctly set
- boolean hasReplicaGroupStrategyConfig = (validationConfig != null);
+ boolean hasReplicaGroupStrategyConfig = (validationConfig.getReplicaGroupStrategyConfig() != null);
// Check that the table push type is not 'refresh'.
boolean isNotRefreshPush =
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
index 23e1773..592bd6e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
@@ -73,7 +73,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
return new ImmutablePair<>(routingTable, variance);
}
- Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) {
+ private Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) {
Map<String, List<String>> routingTable = new HashMap<>();
@@ -86,11 +86,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
List<String> servers = entry.getValue();
for (String serverName : servers) {
- List<String> segmentsForServer = serverToSegmentsMap.get(serverName);
- if (segmentsForServer == null) {
- segmentsForServer = new ArrayList<>();
- serverToSegmentsMap.put(serverName, segmentsForServer);
- }
+ List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverName, k -> new ArrayList<>());
segmentsForServer.add(entry.getKey());
}
}
@@ -134,12 +130,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
// Sort all the segments to be used during assignment in ascending order of replicas
PriorityQueue<Pair<String, List<String>>> segmentToReplicaSetQueue =
- new PriorityQueue<>(numSegments, new Comparator<Pair<String, List<String>>>() {
- @Override
- public int compare(Pair<String, List<String>> firstPair, Pair<String, List<String>> secondPair) {
- return Integer.compare(firstPair.getRight().size(), secondPair.getRight().size());
- }
- });
+ new PriorityQueue<>(numSegments, Comparator.comparingInt(pair -> pair.getRight().size()));
for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
// Servers for the segment is the intersection of all servers for this segment and the servers that we have in
@@ -157,11 +148,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
List<String> serversForSegment = segmentServersPair.getRight();
String serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable);
- List<String> segmentsAssignedToServer = routingTable.get(serverWithLeastSegmentsAssigned);
- if (segmentsAssignedToServer == null) {
- segmentsAssignedToServer = new ArrayList<>();
- routingTable.put(serverWithLeastSegmentsAssigned, segmentsAssignedToServer);
- }
+ List<String> segmentsAssignedToServer =
+ routingTable.computeIfAbsent(serverWithLeastSegmentsAssigned, k -> new ArrayList<>());
segmentsAssignedToServer.add(segmentName);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
index 8ecf14d..19305ea 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
@@ -122,13 +122,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
for (Integer partitionId : partitionIds) {
for (int replicaId = 0; replicaId < _numReplicas; replicaId++) {
List<String> serversForPartitionAndReplica =
- partitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId);
+ partitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId);
for (String serverName : serversForPartitionAndReplica) {
- Map<String, Integer> serverToReplicaMap = partitionToServerToReplicaMap.get(partitionId);
- if (serverToReplicaMap == null) {
- serverToReplicaMap = new HashMap<>();
- partitionToServerToReplicaMap.put(partitionId, serverToReplicaMap);
- }
+ Map<String, Integer> serverToReplicaMap =
+ partitionToServerToReplicaMap.computeIfAbsent(partitionId, k -> new HashMap<>());
serverToReplicaMap.put(serverName, replicaId);
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
index 1f9522c..ae5a95c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
@@ -32,8 +32,8 @@ import org.apache.pinot.common.utils.EqualityUtils;
*/
public class PartitionAssignment {
- protected String _tableName;
- protected Map<String, List<String>> _partitionToInstances;
+ private String _tableName;
+ private Map<String, List<String>> _partitionToInstances;
public PartitionAssignment(String tableName) {
_tableName = tableName;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
index 42e695e..b065793 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
@@ -79,7 +79,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment {
public void addInstanceToReplicaGroup(int partition, int replicaGroup, String instanceName) {
String key = createMappingKey(partition, replicaGroup);
if (!getPartitionToInstances().containsKey(key)) {
- addPartition(key, new ArrayList<String>());
+ addPartition(key, new ArrayList<>());
}
getInstancesListForPartition(key).add(instanceName);
}
@@ -91,7 +91,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment {
* @param replicaGroup Replica group number
* @return List of instances belongs to the given partition and replica group
*/
- public List<String> getInstancesfromReplicaGroup(int partition, int replicaGroup) {
+ public List<String> getInstancesFromReplicaGroup(int partition, int replicaGroup) {
String key = createMappingKey(partition, replicaGroup);
if (!getPartitionToInstances().containsKey(key)) {
throw new NoSuchElementException();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
index 2189653..3adbb90 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
@@ -42,15 +42,8 @@ public class LLCUtils {
final LLCSegmentName segmentName = new LLCSegmentName(segment);
String streamPartitionId = segmentName.getPartitionRange();
- SortedSet<SegmentName> segmentsForPartition = sortedSegmentsByStreamPartition.get(streamPartitionId);
-
- // Create sorted set if necessary
- if (segmentsForPartition == null) {
- segmentsForPartition = new TreeSet<>();
-
- sortedSegmentsByStreamPartition.put(streamPartitionId, segmentsForPartition);
- }
-
+ SortedSet<SegmentName> segmentsForPartition =
+ sortedSegmentsByStreamPartition.computeIfAbsent(streamPartitionId, k -> new TreeSet<>());
segmentsForPartition.add(segmentName);
}
return sortedSegmentsByStreamPartition;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
index b472df3..a75f334 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
@@ -206,7 +206,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr
int currentNewReplicaGroupId = 0;
for (int groupId = 0; groupId < oldNumReplicaGroup; groupId++) {
List<String> oldReplicaGroup =
- oldReplicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, groupId);
+ oldReplicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, groupId);
List<String> newReplicaGroup = new ArrayList<>();
boolean removeGroup = false;
@@ -373,7 +373,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr
List<String> referenceReplicaGroup = new ArrayList<>();
for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
List<String> serversInReplicaGroup =
- replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId);
+ replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId);
if (replicaId == 0) {
// We need to keep the first replica group in case of mirroring.
referenceReplicaGroup.addAll(serversInReplicaGroup);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
index 577e0f7..cd604f5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
@@ -84,7 +84,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS
int index = 0;
for (int groupId = 0; groupId < numReplicas; groupId++) {
List<String> instancesInReplicaGroup =
- replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionNumber, groupId);
+ replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionNumber, groupId);
int numInstances = instancesInReplicaGroup.size();
if (mirrorAssignmentAcrossReplicaGroups) {
// Randomly pick the index and use the same index for all replica groups.
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
index e1bf7c2..31466d6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
@@ -98,7 +98,7 @@ public class ReplicaGroupTestUtils {
// Check if the servers in a replica group covers all segments
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId);
+ List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId);
Set<String> replicaGroupSegments = new HashSet<>();
for (String server : replicaGroup) {
for (String segment : serverToSegments.get(server)) {
@@ -124,7 +124,7 @@ public class ReplicaGroupTestUtils {
for (int serverIndex = 0; serverIndex < replicaGroupConfig.getNumInstancesPerPartition(); serverIndex++) {
Set<String> mirrorSegments = new HashSet<>();
for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId);
+ List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId);
String server = replicaGroup.get(serverIndex);
Set<String> currentSegments = new HashSet<>(serverToSegments.get(server));
mirrorSegments.addAll(currentSegments);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org