You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/03/06 10:06:52 UTC

[pinot] branch master updated: Cache ideal state and external view paths in RoutingEntry (#8296)

This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f67b5aa  Cache ideal state and external view paths in RoutingEntry (#8296)
f67b5aa is described below

commit f67b5aacc5b494a2f5d78e87d67a84ea3aadc99a
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Sun Mar 6 10:06:30 2022 +0000

    Cache ideal state and external view paths in RoutingEntry (#8296)
    
    * cache ideal state and external view paths in RoutingEntry
    
    * review comments
---
 .../pinot/broker/routing/RoutingManager.java       | 51 ++++++++++++++--------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index ca14dd0..ef97aee 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -134,10 +134,9 @@ public class RoutingManager implements ClusterChangeHandler {
     List<String> idealStatePaths = new ArrayList<>(numTables);
     List<String> externalViewPaths = new ArrayList<>(numTables);
     for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
-      String tableNameWithType = entry.getKey();
       routingEntries.add(entry.getValue());
-      idealStatePaths.add(_idealStatePathPrefix + tableNameWithType);
-      externalViewPaths.add(_externalViewPathPrefix + tableNameWithType);
+      idealStatePaths.add(entry.getValue()._idealStatePath);
+      externalViewPaths.add(entry.getValue()._externalViewPath);
     }
     Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, AccessOption.PERSISTENT);
     Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, AccessOption.PERSISTENT);
@@ -154,13 +153,13 @@ public class RoutingManager implements ClusterChangeHandler {
           String tableNameWithType = routingEntry.getTableNameWithType();
           tablesToUpdate.add(tableNameWithType);
           try {
-            IdealState idealState = getIdealState(tableNameWithType);
+            IdealState idealState = getIdealState(routingEntry._idealStatePath);
             if (idealState == null) {
               LOGGER.warn("Failed to find ideal state for table: {}, skipping updating routing entry",
                   tableNameWithType);
               continue;
             }
-            ExternalView externalView = getExternalView(tableNameWithType);
+            ExternalView externalView = getExternalView(routingEntry._externalViewPath);
             if (externalView == null) {
               LOGGER.warn("Failed to find external view for table: {}, skipping updating routing entry",
                   tableNameWithType);
@@ -185,9 +184,9 @@ public class RoutingManager implements ClusterChangeHandler {
   }
 
   @Nullable
-  private IdealState getIdealState(String tableNameWithType) {
+  private IdealState getIdealState(String idealStatePath) {
     Stat stat = new Stat();
-    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT);
+    ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, AccessOption.PERSISTENT);
     if (znRecord != null) {
       znRecord.setVersion(stat.getVersion());
       return new IdealState(znRecord);
@@ -197,9 +196,9 @@ public class RoutingManager implements ClusterChangeHandler {
   }
 
   @Nullable
-  private ExternalView getExternalView(String tableNameWithType) {
+  private ExternalView getExternalView(String externalViewPath) {
     Stat stat = new Stat();
-    ZNRecord znRecord = _zkDataAccessor.get(_externalViewPathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT);
+    ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, AccessOption.PERSISTENT);
     if (znRecord != null) {
       znRecord.setVersion(stat.getVersion());
       return new ExternalView(znRecord);
@@ -303,11 +302,13 @@ public class RoutingManager implements ClusterChangeHandler {
     TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
 
-    IdealState idealState = getIdealState(tableNameWithType);
+    String idealStatePath = getIdealStatePath(tableNameWithType);
+    IdealState idealState = getIdealState(idealStatePath);
     Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
     int idealStateVersion = idealState.getRecord().getVersion();
 
-    ExternalView externalView = getExternalView(tableNameWithType);
+    String externalViewPath = getExternalViewPath(tableNameWithType);
+    ExternalView externalView = getExternalView(externalViewPath);
     int externalViewVersion;
     // NOTE: External view might be null for new created tables. In such case, create an empty one and set the version
     // to -1 to ensure the version does not match the next external view
@@ -355,11 +356,11 @@ public class RoutingManager implements ClusterChangeHandler {
         TableConfig offlineTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
         Preconditions.checkState(offlineTableConfig != null, "Failed to find table config for table: %s",
             offlineTableName);
-        IdealState offlineTableIdealState = getIdealState(offlineTableName);
+        IdealState offlineTableIdealState = getIdealState(getIdealStatePath(offlineTableName));
         Preconditions.checkState(offlineTableIdealState != null, "Failed to find ideal state for table: %s",
             offlineTableName);
         // NOTE: External view might be null for new created tables. In such case, create an empty one.
-        ExternalView offlineTableExternalView = getExternalView(offlineTableName);
+        ExternalView offlineTableExternalView = getExternalView(getExternalViewPath(offlineTableName));
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
@@ -380,8 +381,9 @@ public class RoutingManager implements ClusterChangeHandler {
     Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : null;
 
     RoutingEntry routingEntry =
-        new RoutingEntry(tableNameWithType, segmentPreSelector, segmentSelector, segmentPruners, instanceSelector,
-            idealStateVersion, externalViewVersion, timeBoundaryManager, queryTimeoutMs);
+        new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector,
+            segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, timeBoundaryManager,
+            queryTimeoutMs);
     if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
       LOGGER.info("Built routing for table: {}", tableNameWithType);
     } else {
@@ -477,6 +479,14 @@ public class RoutingManager implements ClusterChangeHandler {
     return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments());
   }
 
+  private String getIdealStatePath(String tableNameWithType) {
+    return _idealStatePathPrefix + tableNameWithType;
+  }
+
+  private String getExternalViewPath(String tableNameWithType) {
+    return _externalViewPathPrefix + tableNameWithType;
+  }
+
   /**
    * Returns the time boundary info for the given offline table, or {@code null} if the routing or time boundary does
    * not exist.
@@ -504,6 +514,8 @@ public class RoutingManager implements ClusterChangeHandler {
 
   private static class RoutingEntry {
     final String _tableNameWithType;
+    final String _idealStatePath;
+    final String _externalViewPath;
     final SegmentPreSelector _segmentPreSelector;
     final SegmentSelector _segmentSelector;
     final List<SegmentPruner> _segmentPruners;
@@ -516,11 +528,14 @@ public class RoutingManager implements ClusterChangeHandler {
     // Time boundary manager is only available for the offline part of the hybrid table
     transient TimeBoundaryManager _timeBoundaryManager;
 
-    RoutingEntry(String tableNameWithType, SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector,
-        List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, int lastUpdateIdealStateVersion,
-        int lastUpdateExternalViewVersion, @Nullable TimeBoundaryManager timeBoundaryManager,
+    RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath,
+        SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners,
+        InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion,
+        @Nullable TimeBoundaryManager timeBoundaryManager,
         @Nullable Long queryTimeoutMs) {
       _tableNameWithType = tableNameWithType;
+      _idealStatePath = idealStatePath;
+      _externalViewPath = externalViewPath;
       _segmentPreSelector = segmentPreSelector;
       _segmentSelector = segmentSelector;
       _segmentPruners = segmentPruners;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org