You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/04/17 20:54:48 UTC

[incubator-pinot] branch master updated: Improve partition aware routing when a server is down. (#4119)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 191255b  Improve partition aware routing when a server is down. (#4119)
191255b is described below

commit 191255b3f8049af484c682bc445ea1f1d0ada54d
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Wed Apr 17 13:54:43 2019 -0700

    Improve partition aware routing when a server is down. (#4119)
    
    When a server is down, current partition aware routing
    will always pick the next server. This pr improves the current
    approach by making the routing table builder to evenly distribute
    the load among available servers.
---
 .../BasePartitionAwareRoutingTableBuilder.java     | 47 +++++++------
 ...rtitionAwareOfflineRoutingTableBuilderTest.java | 79 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 19 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
index e8e8434..7b8b182 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.routing.builder;
 
+import it.unimi.dsi.fastutil.ints.IntArrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,10 +48,10 @@ import org.slf4j.LoggerFactory;
  * for routing. The look up table is in the format of < segment_name -> (replica_id -> server_instance) >.
  *
  * When the query comes in, the routing algorithm is as follows:
- *   1. Randomly pick a replica id (or replica group id)
+ *   1. Shuffle the replica group ids
  *   2. For each segment of the given table,
  *      a. Check if the segment can be pruned. If pruned, go to the next segment.
- *      b. If not pruned, assign the segment to a server with the replica id that is picked above.
+ *      b. If not pruned, assign the segment to a server with the replica id based on the shuffled replica group ids.
  *
  */
 public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTableBuilder {
@@ -101,8 +102,15 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
     Map<String, List<String>> routingTable = new HashMap<>();
     SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest());
 
-    // 1. Randomly pick a replica id
-    int replicaId = _random.nextInt(_numReplicas);
+    // Shuffle the replica group ids in order to satisfy:
+    // a. Pick a replica group in an evenly distributed fashion
+    // b. When a server is not available, the request should be distributed evenly among other available servers.
+    int[] shuffledReplicaGroupIds = new int[_numReplicas];
+    for (int i = 0; i < _numReplicas; i++) {
+      shuffledReplicaGroupIds[i] = i;
+    }
+    IntArrays.shuffle(shuffledReplicaGroupIds, _random);
+
     for (String segmentName : segmentsToQuery) {
       SegmentZKMetadata segmentZKMetadata = _segmentToZkMetadataMapping.get(segmentName);
 
@@ -110,25 +118,26 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
       boolean segmentPruned = (segmentZKMetadata != null) && _pruner.prune(segmentZKMetadata, prunerContext);
 
       if (!segmentPruned) {
-        // 2b. Segment cannot be pruned. Assign the segment to a server with the replica id picked above.
+        // 2b. Segment cannot be pruned. Assign the segment to a server based on the shuffled replica group ids
         Map<Integer, String> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName);
-        String serverName = replicaIdToServerMap.get(replicaId);
-
-        // When the server is not available with this replica id, we need to pick another available server.
-        if (serverName == null) {
-          if (!replicaIdToServerMap.isEmpty()) {
-            serverName = replicaIdToServerMap.values().iterator().next();
-          } else {
-            // No server is found for this segment
-            continue;
+
+        String serverName = null;
+        for (int i = 0; i < _numReplicas; i++) {
+          serverName = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]);
+          // If a server is found, update routing table for the current segment
+          if (serverName != null) {
+            break;
           }
         }
-        List<String> segmentsForServer = routingTable.get(serverName);
-        if (segmentsForServer == null) {
-          segmentsForServer = new ArrayList<>();
-          routingTable.put(serverName, segmentsForServer);
+
+        if (serverName != null) {
+          routingTable.computeIfAbsent(serverName, k -> new ArrayList<>()).add(segmentName);
+        } else {
+          // No server is found for this segment if the code reach here
+
+          // TODO: we need to discuss and decide on how we will be handling this case since we are not returning the
+          // complete result here.
         }
-        segmentsForServer.add(segmentName);
       }
     }
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index 5fe6322..e71dd8a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -229,6 +229,66 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
     Assert.assertEquals(servers.size(), 2);
   }
 
+  @Test
+  public void testRoutingAfterOneServerDown() throws Exception {
+    NUM_REPLICA = 3;
+    NUM_PARTITION = 1;
+    NUM_SERVERS = 3;
+    NUM_SEGMENTS = 20;
+
+    // Create the fake property store
+    FakePropertyStore fakePropertyStore = new FakePropertyStore();
+
+    // Create the table config, partition mapping,
+    TableConfig tableConfig = buildOfflineTableConfig();
+
+    // Create the replica group id to server mapping
+    Map<Integer, List<String>> replicaToServerMapping = buildReplicaGroupMapping();
+
+    // Update segment zk metadata.
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      String segmentName = "segment" + i;
+      int partition = i % NUM_PARTITION;
+      SegmentZKMetadata metadata = buildOfflineSegmentZKMetadata(segmentName, partition);
+      fakePropertyStore
+          .setContents(ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segmentName),
+              metadata.toZNRecord());
+    }
+
+    // Update replica group mapping zk metadata
+    updateReplicaGroupPartitionAssignment(OFFLINE_TABLE_NAME, fakePropertyStore);
+
+    // Create instance Configs
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    for (int serverId = 0; serverId < NUM_SERVERS; serverId++) {
+      String serverName = "Server_localhost_" + serverId;
+      instanceConfigs.add(new InstanceConfig(serverName));
+    }
+
+    // Pick a server that is going to be down
+    Random random = new Random();
+    String downServer = instanceConfigs.get(random.nextInt(instanceConfigs.size())).getInstanceName();
+
+    // Create the fake external view with a down server
+    ExternalView externalView = buildExternalViewWithDownServer(OFFLINE_TABLE_NAME, replicaToServerMapping, downServer);
+
+    // Create the partition aware offline routing table builder
+    RoutingTableBuilder routingTableBuilder =
+        buildPartitionAwareOfflineRoutingTableBuilder(fakePropertyStore, tableConfig, externalView, instanceConfigs);
+
+    Set<String> servers = new HashSet<>();
+    for (int i = 0; i < 100; i++) {
+      String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
+      Map<String, List<String>> routingTable =
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
+      Assert.assertEquals(routingTable.keySet().size(), 1);
+      servers.add(routingTable.keySet().iterator().next());
+    }
+
+    // Check if the other two available servers are getting picked
+    Assert.assertEquals(servers.size(), 2);
+  }
+
   private void updateReplicaGroupPartitionAssignment(String tableNameWithType, FakePropertyStore propertyStore) {
     // Create partition assignment mapping table.
     ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment =
@@ -271,6 +331,25 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
     return externalView;
   }
 
+  private ExternalView buildExternalViewWithDownServer(String tableName, Map<Integer, List<String>> replicaGroupServers,
+      String downServer) throws Exception {
+    // Create External View
+    ExternalView externalView = new ExternalView(tableName);
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      String segmentName = "segment" + i;
+      int serverIndex = i % (NUM_SERVERS / NUM_REPLICA);
+      for (List<String> serversInReplicaGroup : replicaGroupServers.values()) {
+        String serverName = serversInReplicaGroup.get(serverIndex);
+        if (serverName.equals(downServer)) {
+          externalView.setState(segmentName, serversInReplicaGroup.get(serverIndex), "OFFLINE");
+        } else {
+          externalView.setState(segmentName, serversInReplicaGroup.get(serverIndex), "ONLINE");
+        }
+      }
+    }
+    return externalView;
+  }
+
   private Map<Integer, List<String>> buildReplicaGroupMapping() {
     Map<Integer, List<String>> replicaGroupServers = new HashMap<>();
     int numServersPerReplica = NUM_SERVERS / NUM_REPLICA;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org