You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/03/06 10:06:52 UTC
[pinot] branch master updated: Cache ideal state and external view paths in RoutingEntry (#8296)
This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f67b5aa Cache ideal state and external view paths in RoutingEntry (#8296)
f67b5aa is described below
commit f67b5aacc5b494a2f5d78e87d67a84ea3aadc99a
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Sun Mar 6 10:06:30 2022 +0000
Cache ideal state and external view paths in RoutingEntry (#8296)
* cache ideal state and external view paths in RoutingEntry
* review comments
---
.../pinot/broker/routing/RoutingManager.java | 51 ++++++++++++++--------
1 file changed, 33 insertions(+), 18 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index ca14dd0..ef97aee 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -134,10 +134,9 @@ public class RoutingManager implements ClusterChangeHandler {
List<String> idealStatePaths = new ArrayList<>(numTables);
List<String> externalViewPaths = new ArrayList<>(numTables);
for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
- String tableNameWithType = entry.getKey();
routingEntries.add(entry.getValue());
- idealStatePaths.add(_idealStatePathPrefix + tableNameWithType);
- externalViewPaths.add(_externalViewPathPrefix + tableNameWithType);
+ idealStatePaths.add(entry.getValue()._idealStatePath);
+ externalViewPaths.add(entry.getValue()._externalViewPath);
}
Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, AccessOption.PERSISTENT);
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, AccessOption.PERSISTENT);
@@ -154,13 +153,13 @@ public class RoutingManager implements ClusterChangeHandler {
String tableNameWithType = routingEntry.getTableNameWithType();
tablesToUpdate.add(tableNameWithType);
try {
- IdealState idealState = getIdealState(tableNameWithType);
+ IdealState idealState = getIdealState(routingEntry._idealStatePath);
if (idealState == null) {
LOGGER.warn("Failed to find ideal state for table: {}, skipping updating routing entry",
tableNameWithType);
continue;
}
- ExternalView externalView = getExternalView(tableNameWithType);
+ ExternalView externalView = getExternalView(routingEntry._externalViewPath);
if (externalView == null) {
LOGGER.warn("Failed to find external view for table: {}, skipping updating routing entry",
tableNameWithType);
@@ -185,9 +184,9 @@ public class RoutingManager implements ClusterChangeHandler {
}
@Nullable
- private IdealState getIdealState(String tableNameWithType) {
+ private IdealState getIdealState(String idealStatePath) {
Stat stat = new Stat();
- ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT);
+ ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
znRecord.setVersion(stat.getVersion());
return new IdealState(znRecord);
@@ -197,9 +196,9 @@ public class RoutingManager implements ClusterChangeHandler {
}
@Nullable
- private ExternalView getExternalView(String tableNameWithType) {
+ private ExternalView getExternalView(String externalViewPath) {
Stat stat = new Stat();
- ZNRecord znRecord = _zkDataAccessor.get(_externalViewPathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT);
+ ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
znRecord.setVersion(stat.getVersion());
return new ExternalView(znRecord);
@@ -303,11 +302,13 @@ public class RoutingManager implements ClusterChangeHandler {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
- IdealState idealState = getIdealState(tableNameWithType);
+ String idealStatePath = getIdealStatePath(tableNameWithType);
+ IdealState idealState = getIdealState(idealStatePath);
Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
int idealStateVersion = idealState.getRecord().getVersion();
- ExternalView externalView = getExternalView(tableNameWithType);
+ String externalViewPath = getExternalViewPath(tableNameWithType);
+ ExternalView externalView = getExternalView(externalViewPath);
int externalViewVersion;
// NOTE: External view might be null for new created tables. In such case, create an empty one and set the version
// to -1 to ensure the version does not match the next external view
@@ -355,11 +356,11 @@ public class RoutingManager implements ClusterChangeHandler {
TableConfig offlineTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
Preconditions.checkState(offlineTableConfig != null, "Failed to find table config for table: %s",
offlineTableName);
- IdealState offlineTableIdealState = getIdealState(offlineTableName);
+ IdealState offlineTableIdealState = getIdealState(getIdealStatePath(offlineTableName));
Preconditions.checkState(offlineTableIdealState != null, "Failed to find ideal state for table: %s",
offlineTableName);
// NOTE: External view might be null for new created tables. In such case, create an empty one.
- ExternalView offlineTableExternalView = getExternalView(offlineTableName);
+ ExternalView offlineTableExternalView = getExternalView(getExternalViewPath(offlineTableName));
if (offlineTableExternalView == null) {
offlineTableExternalView = new ExternalView(offlineTableName);
}
@@ -380,8 +381,9 @@ public class RoutingManager implements ClusterChangeHandler {
Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : null;
RoutingEntry routingEntry =
- new RoutingEntry(tableNameWithType, segmentPreSelector, segmentSelector, segmentPruners, instanceSelector,
- idealStateVersion, externalViewVersion, timeBoundaryManager, queryTimeoutMs);
+ new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector,
+ segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, timeBoundaryManager,
+ queryTimeoutMs);
if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
LOGGER.info("Built routing for table: {}", tableNameWithType);
} else {
@@ -477,6 +479,14 @@ public class RoutingManager implements ClusterChangeHandler {
return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments());
}
+ private String getIdealStatePath(String tableNameWithType) {
+ return _idealStatePathPrefix + tableNameWithType;
+ }
+
+ private String getExternalViewPath(String tableNameWithType) {
+ return _externalViewPathPrefix + tableNameWithType;
+ }
+
/**
* Returns the time boundary info for the given offline table, or {@code null} if the routing or time boundary does
* not exist.
@@ -504,6 +514,8 @@ public class RoutingManager implements ClusterChangeHandler {
private static class RoutingEntry {
final String _tableNameWithType;
+ final String _idealStatePath;
+ final String _externalViewPath;
final SegmentPreSelector _segmentPreSelector;
final SegmentSelector _segmentSelector;
final List<SegmentPruner> _segmentPruners;
@@ -516,11 +528,14 @@ public class RoutingManager implements ClusterChangeHandler {
// Time boundary manager is only available for the offline part of the hybrid table
transient TimeBoundaryManager _timeBoundaryManager;
- RoutingEntry(String tableNameWithType, SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector,
- List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, int lastUpdateIdealStateVersion,
- int lastUpdateExternalViewVersion, @Nullable TimeBoundaryManager timeBoundaryManager,
+ RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath,
+ SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners,
+ InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion,
+ @Nullable TimeBoundaryManager timeBoundaryManager,
@Nullable Long queryTimeoutMs) {
_tableNameWithType = tableNameWithType;
+ _idealStatePath = idealStatePath;
+ _externalViewPath = externalViewPath;
_segmentPreSelector = segmentPreSelector;
_segmentSelector = segmentSelector;
_segmentPruners = segmentPruners;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org