You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/05/01 17:37:05 UTC

[incubator-pinot] branch master updated: Minor improvements as encountered while studying replica groups (#4180)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e5029b6  Minor improvements as encountered while studying replica groups (#4180)
e5029b6 is described below

commit e5029b6506ef18953dfa8b390ca07c3eb9ea8a6d
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed May 1 10:36:59 2019 -0700

    Minor improvements as encountered while studying replica groups (#4180)
---
 .../broker/routing/RoutingTableBuilderFactory.java |  2 +-
 .../builder/GeneratorBasedRoutingTableBuilder.java | 22 +++++-----------------
 .../PartitionAwareOfflineRoutingTableBuilder.java  |  9 +++------
 .../common/partition/PartitionAssignment.java      |  4 ++--
 .../partition/ReplicaGroupPartitionAssignment.java |  4 ++--
 .../org/apache/pinot/common/utils/LLCUtils.java    | 11 ++---------
 .../ReplicaGroupRebalanceSegmentStrategy.java      |  4 ++--
 .../ReplicaGroupSegmentAssignmentStrategy.java     |  2 +-
 .../controller/utils/ReplicaGroupTestUtils.java    |  4 ++--
 9 files changed, 20 insertions(+), 42 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
index 8fb0ccf..574ccf5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
@@ -116,7 +116,7 @@ public class RoutingTableBuilderFactory {
                 == CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy);
 
         // Check that replica group strategy config is correctly set
-        boolean hasReplicaGroupStrategyConfig = (validationConfig != null);
+        boolean hasReplicaGroupStrategyConfig = (validationConfig.getReplicaGroupStrategyConfig() != null);
 
         // Check that the table push type is not 'refresh'.
         boolean isNotRefreshPush =
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
index 23e1773..592bd6e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
@@ -73,7 +73,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
     return new ImmutablePair<>(routingTable, variance);
   }
 
-  Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) {
+  private Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) {
 
     Map<String, List<String>> routingTable = new HashMap<>();
 
@@ -86,11 +86,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
     for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
       List<String> servers = entry.getValue();
       for (String serverName : servers) {
-        List<String> segmentsForServer = serverToSegmentsMap.get(serverName);
-        if (segmentsForServer == null) {
-          segmentsForServer = new ArrayList<>();
-          serverToSegmentsMap.put(serverName, segmentsForServer);
-        }
+        List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverName, k -> new ArrayList<>());
         segmentsForServer.add(entry.getKey());
       }
     }
@@ -134,12 +130,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
 
     // Sort all the segments to be used during assignment in ascending order of replicas
     PriorityQueue<Pair<String, List<String>>> segmentToReplicaSetQueue =
-        new PriorityQueue<>(numSegments, new Comparator<Pair<String, List<String>>>() {
-          @Override
-          public int compare(Pair<String, List<String>> firstPair, Pair<String, List<String>> secondPair) {
-            return Integer.compare(firstPair.getRight().size(), secondPair.getRight().size());
-          }
-        });
+        new PriorityQueue<>(numSegments, Comparator.comparingInt(pair -> pair.getRight().size()));
 
     for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
       // Servers for the segment is the intersection of all servers for this segment and the servers that we have in
@@ -157,11 +148,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
       List<String> serversForSegment = segmentServersPair.getRight();
 
       String serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable);
-      List<String> segmentsAssignedToServer = routingTable.get(serverWithLeastSegmentsAssigned);
-      if (segmentsAssignedToServer == null) {
-        segmentsAssignedToServer = new ArrayList<>();
-        routingTable.put(serverWithLeastSegmentsAssigned, segmentsAssignedToServer);
-      }
+      List<String> segmentsAssignedToServer =
+          routingTable.computeIfAbsent(serverWithLeastSegmentsAssigned, k -> new ArrayList<>());
       segmentsAssignedToServer.add(segmentName);
     }
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
index 8ecf14d..19305ea 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
@@ -122,13 +122,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
     for (Integer partitionId : partitionIds) {
       for (int replicaId = 0; replicaId < _numReplicas; replicaId++) {
         List<String> serversForPartitionAndReplica =
-            partitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId);
+            partitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId);
         for (String serverName : serversForPartitionAndReplica) {
-          Map<String, Integer> serverToReplicaMap = partitionToServerToReplicaMap.get(partitionId);
-          if (serverToReplicaMap == null) {
-            serverToReplicaMap = new HashMap<>();
-            partitionToServerToReplicaMap.put(partitionId, serverToReplicaMap);
-          }
+          Map<String, Integer> serverToReplicaMap =
+              partitionToServerToReplicaMap.computeIfAbsent(partitionId, k -> new HashMap<>());
           serverToReplicaMap.put(serverName, replicaId);
         }
       }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
index 1f9522c..ae5a95c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java
@@ -32,8 +32,8 @@ import org.apache.pinot.common.utils.EqualityUtils;
  */
 public class PartitionAssignment {
 
-  protected String _tableName;
-  protected Map<String, List<String>> _partitionToInstances;
+  private String _tableName;
+  private Map<String, List<String>> _partitionToInstances;
 
   public PartitionAssignment(String tableName) {
     _tableName = tableName;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
index 42e695e..b065793 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java
@@ -79,7 +79,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment {
   public void addInstanceToReplicaGroup(int partition, int replicaGroup, String instanceName) {
     String key = createMappingKey(partition, replicaGroup);
     if (!getPartitionToInstances().containsKey(key)) {
-      addPartition(key, new ArrayList<String>());
+      addPartition(key, new ArrayList<>());
     }
     getInstancesListForPartition(key).add(instanceName);
   }
@@ -91,7 +91,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment {
    * @param replicaGroup Replica group number
    * @return List of instances belongs to the given partition and replica group
    */
-  public List<String> getInstancesfromReplicaGroup(int partition, int replicaGroup) {
+  public List<String> getInstancesFromReplicaGroup(int partition, int replicaGroup) {
     String key = createMappingKey(partition, replicaGroup);
     if (!getPartitionToInstances().containsKey(key)) {
       throw new NoSuchElementException();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
index 2189653..3adbb90 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
@@ -42,15 +42,8 @@ public class LLCUtils {
 
       final LLCSegmentName segmentName = new LLCSegmentName(segment);
       String streamPartitionId = segmentName.getPartitionRange();
-      SortedSet<SegmentName> segmentsForPartition = sortedSegmentsByStreamPartition.get(streamPartitionId);
-
-      // Create sorted set if necessary
-      if (segmentsForPartition == null) {
-        segmentsForPartition = new TreeSet<>();
-
-        sortedSegmentsByStreamPartition.put(streamPartitionId, segmentsForPartition);
-      }
-
+      SortedSet<SegmentName> segmentsForPartition =
+          sortedSegmentsByStreamPartition.computeIfAbsent(streamPartitionId, k -> new TreeSet<>());
       segmentsForPartition.add(segmentName);
     }
     return sortedSegmentsByStreamPartition;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
index b472df3..a75f334 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java
@@ -206,7 +206,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr
       int currentNewReplicaGroupId = 0;
       for (int groupId = 0; groupId < oldNumReplicaGroup; groupId++) {
         List<String> oldReplicaGroup =
-            oldReplicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, groupId);
+            oldReplicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, groupId);
         List<String> newReplicaGroup = new ArrayList<>();
         boolean removeGroup = false;
 
@@ -373,7 +373,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr
       List<String> referenceReplicaGroup = new ArrayList<>();
       for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
         List<String> serversInReplicaGroup =
-            replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId);
+            replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId);
         if (replicaId == 0) {
           // We need to keep the first replica group in case of mirroring.
           referenceReplicaGroup.addAll(serversInReplicaGroup);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
index 577e0f7..cd604f5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
@@ -84,7 +84,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS
     int index = 0;
     for (int groupId = 0; groupId < numReplicas; groupId++) {
       List<String> instancesInReplicaGroup =
-          replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionNumber, groupId);
+          replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionNumber, groupId);
       int numInstances = instancesInReplicaGroup.size();
       if (mirrorAssignmentAcrossReplicaGroups) {
         // Randomly pick the index and use the same index for all replica groups.
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
index e1bf7c2..31466d6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java
@@ -98,7 +98,7 @@ public class ReplicaGroupTestUtils {
     // Check if the servers in a replica group covers all segments
     for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
       for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId);
+        List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId);
         Set<String> replicaGroupSegments = new HashSet<>();
         for (String server : replicaGroup) {
           for (String segment : serverToSegments.get(server)) {
@@ -124,7 +124,7 @@ public class ReplicaGroupTestUtils {
         for (int serverIndex = 0; serverIndex < replicaGroupConfig.getNumInstancesPerPartition(); serverIndex++) {
           Set<String> mirrorSegments = new HashSet<>();
           for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-            List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId);
+            List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId);
             String server = replicaGroup.get(serverIndex);
             Set<String> currentSegments = new HashSet<>(serverToSegments.get(server));
             mirrorSegments.addAll(currentSegments);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org