You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "61yao (via GitHub)" <gi...@apache.org> on 2023/02/28 11:08:59 UTC

[GitHub] [pinot] 61yao opened a new pull request, #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

61yao opened a new pull request, #10350:
URL: https://github.com/apache/pinot/pull/10350

   New segments won't exclude unavailable instances from replica group. Also, we don't report new segments as unavailable segments. 
   
   Definition of new segments: segments with creation age more than 5 minutes and whose external view hasn't converged with ideal state. 
   
   Implication of this implementation:    
   
   1)  Inconsistency issues across server when querying
   We think this inconsistency is acceptable because it greatly improves query availability.
   
   2) Skip reporting segments as error if any new segment goes to error state. 
   This is also acceptable because if the segment doesn’t resume to a healthy state within 5 minutes, we will treat this as an error. 
   However, since we only update new segments upon assignment/instance change, some of the new segments may expire under clock and we are not able to track them. So we may skip reporting issues for these segments. 
   
   
   Details can be seen in:
   
   https://docs.google.com/document/d/1nfNZxVOXMIykabsC73Ab83SnHrNaZrlLRmr3UkFTE_c/edit#heading=h.ze4g3qr2kl2v
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137543416


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;
+  private Clock _clock;
 
   BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this(tableNameWithType, brokerMetrics, adaptiveServerSelector, propertyStore, Clock.systemUTC());
+  }
+
+  // Test only for clock injection.
+  BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      Clock clock) {
     _tableNameWithType = tableNameWithType;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
+    _newSegmentStates = new HashMap<>();
+    _propertyStore = propertyStore;
+    _clock = clock;
+  }
+
+  // Get the segment where the ideal state hasn't converged with external view and which doesn't have error instance.
+  private static List<String> getPotentialNewSegments(IdealState idealState, ExternalView externalView,
+      Set<String> onlineSegments) {
+    List<String> potentialNewSegments = new ArrayList<>();
+    Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      // Only track online segments
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      // Segments with missing external view are considered as new.
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      List<String> onlineInstance = new ArrayList<>();
+      boolean couldBeNewSegment = true;

Review Comment:
   updated the logic. PTAL



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1133278292


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   Currently, `_segmentStateSnapshot` is getting updated `onInstanceChange()` and `onAssignmentChange()`. This means that we only update `newSegment` list when there's a change in instance or idealstate/externalview.
   
   If there's no update on instance/idealstate/externalview for a long time (valid case), I think that we currently do not move newSegments to oldSegments. Please correct me if I'm wrong.
   
   So, I remember that we discussed that we may need to add the background thread that is peridically looks at the newSegment states and promote them to oldSegment if it's been more than 5 minutes since its creation.
   
   What was our conclusion here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #10350:
URL: https://github.com/apache/pinot/pull/10350#issuecomment-1482035378

   This PR is mistakenly closed. Cloned in #10466


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10350:
URL: https://github.com/apache/pinot/pull/10350#issuecomment-1448042685

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10350](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (358b113) into [master](https://codecov.io/gh/apache/pinot/commit/1fce07ed77ff31af6d3b85a6b03a4b76632ed133?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1fce07e) will **decrease** coverage by `55.03%`.
   > The diff coverage is `98.59%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10350       +/-   ##
   =============================================
   - Coverage     68.82%   13.80%   -55.03%     
   + Complexity     5851      221     -5630     
   =============================================
     Files          2027     1978       -49     
     Lines        109891   107712     -2179     
     Branches      16685    16453      -232     
   =============================================
   - Hits          75637    14870    -60767     
   - Misses        28897    91667    +62770     
   + Partials       5357     1175     -4182     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `13.80% <98.59%> (+0.03%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...instanceselector/ReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1JlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `35.00% <ø> (ø)` | |
   | [...va/org/apache/pinot/spi/utils/CommonConstants.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQ29tbW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <0.00%> (-24.40%)` | :arrow_down: |
   | [...routing/instanceselector/BaseInstanceSelector.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0Jhc2VJbnN0YW5jZVNlbGVjdG9yLmphdmE=) | `97.32% <100.00%> (ø)` | |
   | [...ting/instanceselector/InstanceSelectorFactory.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3JGYWN0b3J5LmphdmE=) | `75.00% <100.00%> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `96.99% <100.00%> (+1.53%)` | :arrow_up: |
   | [...src/main/java/org/apache/pinot/sql/FilterKind.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcWwvRmlsdGVyS2luZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/common/CustomObject.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vQ3VzdG9tT2JqZWN0LmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/core/data/table/Table.java](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1RhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1573 more](https://codecov.io/gh/apache/pinot/pull/10350?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122290823


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -51,7 +51,8 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      long nowMillis) {

Review Comment:
   Why are we passing time here? There should be 2 maps passed to the selector, one for the old segments and one for the new segments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122377836


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -51,7 +51,8 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      long nowMillis) {

Review Comment:
   Because we want to use the same timestamp to check new segment age for both segment instance assignment and unavailable segment reporting. The other thing is getting clock time is system call. it is not cheap.
   
   The new segments are processed only  in StrictReplicaGroup. I didn't change the behavior for other selector. 
   
   New segments information are directly used through private member in StrictReplicaGroup and thus we don't need passing. 
   Please see the implementation for StrictReplicaGroup. 
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122374608


##########
pinot-broker/pom.xml:
##########
@@ -153,5 +153,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>

Review Comment:
   It is easier to advance clock and stuff. I can remove it if we are concerned about dependency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137544004


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -141,139 +233,116 @@ public void onInstancesChange(Set<String> enabledInstances, List<String> changed
    */
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
+    long nowMillis = _clock.millis();
+    onAssignmentChange(idealState, externalView, onlineSegments, nowMillis, true);
+  }
+
+  private void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
+      long nowMillis, boolean refreshNewSegments) {
+    if (refreshNewSegments) {
+      // If this call is not from init, we use existing state to check whether a segment is new.
+      // And we use system clock time as an approximation of segment creation time.
+      for (String segment : onlineSegments) {
+        if (!_segmentToOnlineInstancesMap.containsKey(segment) && !_newSegmentStates.containsKey(segment)) {
+          _newSegmentStates.put(segment, new SegmentState(nowMillis));
+        }
+      }
+    }
     _segmentToOnlineInstancesMap.clear();
-    _segmentToOfflineInstancesMap.clear();
-    _instanceToSegmentsMap.clear();
 
     // Update the cached maps
-    updateSegmentMaps(idealState, externalView, onlineSegments, _segmentToOnlineInstancesMap,
-        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
+    updateSegmentMaps(idealState, externalView, onlineSegments, _segmentToOnlineInstancesMap, _newSegmentStates,
+        nowMillis);
 
-    // Generate a new map from segment to enabled ONLINE/CONSUMING instances and a new set of unavailable segments (no
-    // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap =
-        new HashMap<>(HashUtil.getHashMapCapacity(_segmentToOnlineInstancesMap.size()));
-    Set<String> unavailableSegments = new HashSet<>();
-    // NOTE: Put null as the value when there is no enabled instances for a segment so that segmentToEnabledInstancesMap
-    // always contains all segments. With this, in onInstancesChange() we can directly iterate over
-    // segmentToEnabledInstancesMap.entrySet() and modify the value without changing the map entries.
-    for (Map.Entry<String, List<String>> entry : _segmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      List<String> enabledInstancesForSegment =
-          calculateEnabledInstancesForSegment(segment, entry.getValue(), unavailableSegments);
-      segmentToEnabledInstancesMap.put(segment, enabledInstancesForSegment);
-    }
-
-    _segmentToEnabledInstancesMap = segmentToEnabledInstancesMap;
-    _unavailableSegments = unavailableSegments;
+    _segmentStateSnapshot =
+        SegmentStateSnapshot.createSnapshot(_tableNameWithType, _segmentToOnlineInstancesMap, _newSegmentStates,
+            _enabledInstances, _brokerMetrics);
   }
 
   /**
    * Updates the segment maps based on the given ideal state, external view and online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}).
    */
-  void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
-      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, List<String>> segmentToOfflineInstancesMap,
-      Map<String, List<String>> instanceToSegmentsMap) {
-    // Iterate over the external view instead of the online segments so that the map lookups are performed on the
-    // HashSet instead of the TreeSet for performance
-    // NOTE: Do not track segments not in the external view because it is a valid state when the segment is new added
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
-    for (Map.Entry<String, Map<String, String>> entry : externalView.getRecord().getMapFields().entrySet()) {
+  protected void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, SegmentState> newSegmentStateMap,
+      long nowMillis) {
+    // NOTE: Segments with missing external view are considered as new.
+    Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
+    // Iterate over the ideal state instead of the external view since this will cover segment with missing external
+    // view.
+    for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
       String segment = entry.getKey();
-
       // Only track online segments
       if (!onlineSegments.contains(segment)) {
         continue;
       }
-
-      Map<String, String> externalViewInstanceStateMap = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
-      List<String> onlineInstances = new ArrayList<>(externalViewInstanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>();
-      segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      // Sort the online instances for replica-group routing to work. For multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will always point to the same instance.
+      Set<String> onlineInstances = new TreeSet<>();
       for (Map.Entry<String, String> instanceStateEntry : externalViewInstanceStateMap.entrySet()) {
         String instance = instanceStateEntry.getKey();
-
         // Only track instances within the ideal state
         // NOTE: When an instance is not in the ideal state, the instance will drop the segment soon, and it is not safe
         // to query this instance for the segment. This could happen when a segment is moved from one instance to
         // another instance.
         if (!idealStateInstanceStateMap.containsKey(instance)) {
           continue;
         }
-
         String externalViewState = instanceStateEntry.getValue();
         // Do not track instances in ERROR state
         if (!externalViewState.equals(SegmentStateModel.ERROR)) {
-          instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
-          if (externalViewState.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
+          if (SegmentStateModel.isOnline(externalViewState)) {
             onlineInstances.add(instance);
           }
+        } else {
+          // Segment with error state instance should be considered old.
+          newSegmentStateMap.remove(segment);
         }
       }
-
-      // Sort the online instances for replica-group routing to work. For multiple segments with the same online
-      // instances, if the list is sorted, the same index in the list will always point to the same instance.
-      if (!(externalViewInstanceStateMap instanceof SortedMap)) {
-        onlineInstances.sort(null);
-        offlineInstances.sort(null);
-      }
-    }
-  }
-
-  /**
-   * Calculates the enabled ONLINE/CONSUMING instances for the given segment, and updates the unavailable segments (no
-   * enabled instance or all enabled instances are in ERROR state).
-   */
-  @Nullable
-  private List<String> calculateEnabledInstancesForSegment(String segment, List<String> onlineInstancesForSegment,
-      Set<String> unavailableSegments) {
-    List<String> enabledInstancesForSegment = new ArrayList<>(onlineInstancesForSegment.size());
-    for (String onlineInstance : onlineInstancesForSegment) {
-      if (_enabledInstances.contains(onlineInstance)) {
-        enabledInstancesForSegment.add(onlineInstance);
+      SegmentState state = newSegmentStateMap.get(segment);

Review Comment:
   this could be old segment too when it gets retired by clock



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   this will be a followup PR as discussed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1144063853


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+// Represents an instance candidate for segment.

Review Comment:
   done.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentState.java:
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+// Class used to represent the instance state for new segment.

Review Comment:
   done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStateSnapshot.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  This class represents a snapshot state of segments used for routing purpose.
+ *  Note that this class is immutable after creation.
+ *
+ *  For old segments, we return a list of online instances with online flags set to true.
+ *  For old segments without any online instances, we report them as unavailable segments.
+ *
+ *  For new segments, we return a list of candidate instance with online flags to indicate whether the instance is
+ *  online or not.
+ *  We don't report new segment as unavailable segments because it is valid for new segments to be not online at all.
+ */
+@Immutable
+public class SegmentStateSnapshot {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStateSnapshot.class);
+
+  private Map<String, List<SegmentInstanceCandidate>> _segmentCandidates;
+  private Set<String> _unavailableSegments;
+
+  // Create a segment state snapshot based on some in-memory states to be used for routing.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137542653


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;

Review Comment:
   Added a comment and changed to a list 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;

Review Comment:
   Added a comment and changed to a list since we need to preserve the order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122440824


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -51,7 +51,8 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      long nowMillis) {

Review Comment:
   We want the query time operation to be very light weight. Basically we should process old/new segments and put them into cache before query time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1133267301


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;

Review Comment:
   Can we add the comment on what the key/value is supposed to be? 
   
   I guess that it is the following:
   key = instance
   value = true (`online`)/false(`offline`)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;
+  private Clock _clock;
 
   BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this(tableNameWithType, brokerMetrics, adaptiveServerSelector, propertyStore, Clock.systemUTC());
+  }
+
+  // Test only for clock injection.
+  BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      Clock clock) {
     _tableNameWithType = tableNameWithType;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
+    _newSegmentStates = new HashMap<>();
+    _propertyStore = propertyStore;
+    _clock = clock;
+  }
+
+  // Get the segment where the ideal state hasn't converged with external view and which doesn't have error instance.
+  private static List<String> getPotentialNewSegments(IdealState idealState, ExternalView externalView,
+      Set<String> onlineSegments) {
+    List<String> potentialNewSegments = new ArrayList<>();
+    Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      // Only track online segments
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      // Segments with missing external view are considered as new.
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      List<String> onlineInstance = new ArrayList<>();
+      boolean couldBeNewSegment = true;

Review Comment:
   Do we need this flag? Since we break on L163, we probably can remove the check on `couldBeNewSegment` and still get the same logic.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -141,139 +233,116 @@ public void onInstancesChange(Set<String> enabledInstances, List<String> changed
    */
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
+    long nowMillis = _clock.millis();
+    onAssignmentChange(idealState, externalView, onlineSegments, nowMillis, true);
+  }
+
+  private void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
+      long nowMillis, boolean refreshNewSegments) {
+    if (refreshNewSegments) {
+      // If this call is not from init, we use existing state to check whether a segment is new.
+      // And we use system clock time as an approximation of segment creation time.
+      for (String segment : onlineSegments) {
+        if (!_segmentToOnlineInstancesMap.containsKey(segment) && !_newSegmentStates.containsKey(segment)) {
+          _newSegmentStates.put(segment, new SegmentState(nowMillis));
+        }
+      }
+    }
     _segmentToOnlineInstancesMap.clear();
-    _segmentToOfflineInstancesMap.clear();
-    _instanceToSegmentsMap.clear();
 
     // Update the cached maps
-    updateSegmentMaps(idealState, externalView, onlineSegments, _segmentToOnlineInstancesMap,
-        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
+    updateSegmentMaps(idealState, externalView, onlineSegments, _segmentToOnlineInstancesMap, _newSegmentStates,
+        nowMillis);
 
-    // Generate a new map from segment to enabled ONLINE/CONSUMING instances and a new set of unavailable segments (no
-    // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap =
-        new HashMap<>(HashUtil.getHashMapCapacity(_segmentToOnlineInstancesMap.size()));
-    Set<String> unavailableSegments = new HashSet<>();
-    // NOTE: Put null as the value when there is no enabled instances for a segment so that segmentToEnabledInstancesMap
-    // always contains all segments. With this, in onInstancesChange() we can directly iterate over
-    // segmentToEnabledInstancesMap.entrySet() and modify the value without changing the map entries.
-    for (Map.Entry<String, List<String>> entry : _segmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      List<String> enabledInstancesForSegment =
-          calculateEnabledInstancesForSegment(segment, entry.getValue(), unavailableSegments);
-      segmentToEnabledInstancesMap.put(segment, enabledInstancesForSegment);
-    }
-
-    _segmentToEnabledInstancesMap = segmentToEnabledInstancesMap;
-    _unavailableSegments = unavailableSegments;
+    _segmentStateSnapshot =
+        SegmentStateSnapshot.createSnapshot(_tableNameWithType, _segmentToOnlineInstancesMap, _newSegmentStates,
+            _enabledInstances, _brokerMetrics);
   }
 
   /**
    * Updates the segment maps based on the given ideal state, external view and online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}).
    */
-  void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
-      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, List<String>> segmentToOfflineInstancesMap,
-      Map<String, List<String>> instanceToSegmentsMap) {
-    // Iterate over the external view instead of the online segments so that the map lookups are performed on the
-    // HashSet instead of the TreeSet for performance
-    // NOTE: Do not track segments not in the external view because it is a valid state when the segment is new added
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
-    for (Map.Entry<String, Map<String, String>> entry : externalView.getRecord().getMapFields().entrySet()) {
+  protected void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, SegmentState> newSegmentStateMap,
+      long nowMillis) {
+    // NOTE: Segments with missing external view are considered as new.
+    Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
+    // Iterate over the ideal state instead of the external view since this will cover segment with missing external
+    // view.
+    for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
       String segment = entry.getKey();
-
       // Only track online segments
       if (!onlineSegments.contains(segment)) {
         continue;
       }
-
-      Map<String, String> externalViewInstanceStateMap = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
-      List<String> onlineInstances = new ArrayList<>(externalViewInstanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>();
-      segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      // Sort the online instances for replica-group routing to work. For multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will always point to the same instance.
+      Set<String> onlineInstances = new TreeSet<>();
       for (Map.Entry<String, String> instanceStateEntry : externalViewInstanceStateMap.entrySet()) {
         String instance = instanceStateEntry.getKey();
-
         // Only track instances within the ideal state
         // NOTE: When an instance is not in the ideal state, the instance will drop the segment soon, and it is not safe
         // to query this instance for the segment. This could happen when a segment is moved from one instance to
         // another instance.
         if (!idealStateInstanceStateMap.containsKey(instance)) {
           continue;
         }
-
         String externalViewState = instanceStateEntry.getValue();
         // Do not track instances in ERROR state
         if (!externalViewState.equals(SegmentStateModel.ERROR)) {
-          instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
-          if (externalViewState.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
+          if (SegmentStateModel.isOnline(externalViewState)) {
             onlineInstances.add(instance);
           }
+        } else {
+          // Segment with error state instance should be considered old.
+          newSegmentStateMap.remove(segment);
         }
       }
-
-      // Sort the online instances for replica-group routing to work. For multiple segments with the same online
-      // instances, if the list is sorted, the same index in the list will always point to the same instance.
-      if (!(externalViewInstanceStateMap instanceof SortedMap)) {
-        onlineInstances.sort(null);
-        offlineInstances.sort(null);
-      }
-    }
-  }
-
-  /**
-   * Calculates the enabled ONLINE/CONSUMING instances for the given segment, and updates the unavailable segments (no
-   * enabled instance or all enabled instances are in ERROR state).
-   */
-  @Nullable
-  private List<String> calculateEnabledInstancesForSegment(String segment, List<String> onlineInstancesForSegment,
-      Set<String> unavailableSegments) {
-    List<String> enabledInstancesForSegment = new ArrayList<>(onlineInstancesForSegment.size());
-    for (String onlineInstance : onlineInstancesForSegment) {
-      if (_enabledInstances.contains(onlineInstance)) {
-        enabledInstancesForSegment.add(onlineInstance);
+      SegmentState state = newSegmentStateMap.get(segment);

Review Comment:
   Let's rename this to `newSegmentState`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;

Review Comment:
   We can remove `_instanceToSegmentsMap`?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   Currently, `_segmentStateSnapshot` is getting updated `onInstanceChange()` and `onAssignmentChange()`. This means that we only update `newSegment` list when there's a change in instance or idealstate/externalview.
   
   If there's no update on instance/idealstate/externalview for a long time (valid case), I think that we currently do not move newSegments to oldSegments. Please correct me if I'm wrong.
   
   If this is the case, should we consider adding a background thread that's periodically check the `SegmentStateSnapshot` and promote newSegments to oldSegments after 5min?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122509587


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -51,7 +51,8 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      long nowMillis) {

Review Comment:
   PTAL. 
   The current logic still checks whether segment is new/old in query time but it doesn't update the cache. Let me know if this is still too complicated. I can remove this to make query time logic lighter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1143481613


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+// Represents an instance candidate for segment.

Review Comment:
   We normally use the following convention for the comment for the class
   
   ```
   /**
    *   comment
    */
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentState.java:
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+// Class used to represent the instance state for new segment.

Review Comment:
   Same here



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentStateSnapshot.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  This class represents a snapshot state of segments used for routing purpose.
+ *  Note that this class is immutable after creation.
+ *
+ *  For old segments, we return a list of online instances with online flags set to true.
+ *  For old segments without any online instances, we report them as unavailable segments.
+ *
+ *  For new segments, we return a list of candidate instance with online flags to indicate whether the instance is
+ *  online or not.
+ *  We don't report new segment as unavailable segments because it is valid for new segments to be not online at all.
+ */
+@Immutable
+public class SegmentStateSnapshot {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStateSnapshot.class);
+
+  private Map<String, List<SegmentInstanceCandidate>> _segmentCandidates;
+  private Set<String> _unavailableSegments;
+
+  // Create a segment state snapshot based on some in-memory states to be used for routing.

Review Comment:
   (nit) We also use `/** ... **/` for the comments on the functions.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,162 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
- * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
+ * Base implementation of instance selector. Selector maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ * <p>
+ * Special handling of new segment: It is common for new segment to be partially available or not available at all in
+ * all instances.
+ * 1) We don't report new segment as unavailable segments.
+ * 2) To increase query availability, unavailable
+ * instance for new segment won't be excluded for instance selection. When it is selected, we don't serve the new
+ * segment.
+ * <p>
+ * Definition of new segment:
+ * 1) Segment created more than 5 minutes ago.
+ * - If we first see a segment via initialization, we look up segment creation time from zookeeper.
+ * - If we first see a segment via onAssignmentChange initialization, we use the calling time of onAssignmentChange
+ * as approximation.
+ * 2) We retire new segment as old when:
+ * - The creation time is more than 5 minutes ago
+ * - Any instance for new segment is in error state
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistent selection of new segments across queries. (some queries will serve new segments and others won't)
+ * 2) When there is no state update from helix, new segments won't be retired because of the time passing.

Review Comment:
   Let's add the comment about `TODO` to enhance this? Otherwise, ppl will think that this is the expected behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1143464290


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1134509046


##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -106,12 +139,13 @@ public void testInstanceSelector() {
     String offlineTableName = "testTable_OFFLINE";
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     AdaptiveServerSelector adaptiveServerSelector = null;
-    BalancedInstanceSelector balancedInstanceSelector = new BalancedInstanceSelector(offlineTableName, brokerMetrics,
-        adaptiveServerSelector);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
+    BalancedInstanceSelector balancedInstanceSelector =

Review Comment:
   This test already contains all types of instance selectors. Do we need 3 new separate tests for them? We don't want to keep duplicate tests as that will add management overhead



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -113,18 +114,27 @@ public static class Helix {
     public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
 
     public static class StateModel {
+      // public only for testing purpose.
+      public static final long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);
       public static class SegmentStateModel {
         public static final String ONLINE = "ONLINE";
         public static final String OFFLINE = "OFFLINE";
         public static final String ERROR = "ERROR";
         public static final String CONSUMING = "CONSUMING";
+        public static boolean isOnline(String state) {
+          return state.equals(ONLINE) || state.equals(CONSUMING);
+        }
       }
 
       public static class BrokerResourceStateModel {
         public static final String ONLINE = "ONLINE";
         public static final String OFFLINE = "OFFLINE";
         public static final String ERROR = "ERROR";
       }
+
+      public static boolean isNewSegment(long creationTimeMillis, long nowMillis) {

Review Comment:
   This doesn't belong to this class. You may put it in `InstanceSelector` interface, or add a new util for it



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java:
##########
@@ -32,27 +32,29 @@ public interface AdaptiveServerSelector {
    * Picks the best server to route a query from the list of candidate servers.
    *
    * @param serverCandidates Candidate servers from which the best server should be chosen.
-   * @return server identifier
+   * @return server identifier and whether server is online
    */
-  String select(List<String> serverCandidates);
+  Pair<String, Boolean> select(List<Pair<String, Boolean>> serverCandidates);

Review Comment:
   The ONLINE/OFFLINE is for a single segment server state. Here we should probably only keep the online servers



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -58,13 +62,28 @@
  * transitioning/error scenario (external view does not match ideal state), if a segment is down on S1, we mark all
  * segments with the same assignment ([S1, S2, S3]) down on S1 to ensure that we always route the segments to the same
  * replica-group.
- * </pre>
+ *
+ * Note that New segment won't be used to exclude instance from serving where new segment is unavailable.
+ *
+ *  </pre>
  */
 public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSelector {
 
-  public StrictReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics, @Nullable
-      AdaptiveServerSelector adaptiveServerSelector) {
-    super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
+  private boolean isNewSegment(String segment, Map<String, SegmentState> newSegmentStateMap, long nowMillis) {
+    SegmentState state = newSegmentStateMap.get(segment);
+    return state != null && state.isNew(nowMillis);
+  }
+
+  public StrictReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this(tableNameWithType, brokerMetrics, adaptiveServerSelector, propertyStore, Clock.systemUTC());
+  }
+
+  // Test only for clock injection.

Review Comment:
   (minor) Use `@VisibleForTesting` instead of comment



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -617,7 +617,8 @@ protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tabl
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          return getCurrentCountStarResult(tableName) == countStarResult;
+          long curCount = getCurrentCountStarResult(tableName);

Review Comment:
   (minor) unnecessary change



##########
pinot-broker/pom.xml:
##########
@@ -153,5 +153,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>

Review Comment:
   A lighter library (avoid conflict) to use here:
   ```
       <dependency>
           <groupId>com.mercateo</groupId>
           <artifactId>test-clock</artifactId>
           <version>1.0.2</version>
           <scope>test</scope>
       </dependency>
   ```
   
   Let's put it in the root pom with the version, then reference that here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137544402


##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -106,12 +139,13 @@ public void testInstanceSelector() {
     String offlineTableName = "testTable_OFFLINE";
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     AdaptiveServerSelector adaptiveServerSelector = null;
-    BalancedInstanceSelector balancedInstanceSelector = new BalancedInstanceSelector(offlineTableName, brokerMetrics,
-        adaptiveServerSelector);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
+    BalancedInstanceSelector balancedInstanceSelector =

Review Comment:
   done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -58,13 +62,28 @@
  * transitioning/error scenario (external view does not match ideal state), if a segment is down on S1, we mark all
  * segments with the same assignment ([S1, S2, S3]) down on S1 to ensure that we always route the segments to the same
  * replica-group.
- * </pre>
+ *
+ * Note that New segment won't be used to exclude instance from serving where new segment is unavailable.
+ *
+ *  </pre>
  */
 public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSelector {
 
-  public StrictReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics, @Nullable
-      AdaptiveServerSelector adaptiveServerSelector) {
-    super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
+  private boolean isNewSegment(String segment, Map<String, SegmentState> newSegmentStateMap, long nowMillis) {
+    SegmentState state = newSegmentStateMap.get(segment);
+    return state != null && state.isNew(nowMillis);
+  }
+
+  public StrictReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this(tableNameWithType, brokerMetrics, adaptiveServerSelector, propertyStore, Clock.systemUTC());
+  }
+
+  // Test only for clock injection.

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137545177


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java:
##########
@@ -32,27 +32,29 @@ public interface AdaptiveServerSelector {
    * Picks the best server to route a query from the list of candidate servers.
    *
    * @param serverCandidates Candidate servers from which the best server should be chosen.
-   * @return server identifier
+   * @return server identifier and whether server is online
    */
-  String select(List<String> serverCandidates);
+  Pair<String, Boolean> select(List<Pair<String, Boolean>> serverCandidates);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao closed pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao closed pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup
URL: https://github.com/apache/pinot/pull/10350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on PR #10350:
URL: https://github.com/apache/pinot/pull/10350#issuecomment-1447994320

   @Jackie-Jiang @snleee


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122384566


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -51,7 +51,8 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      long nowMillis) {

Review Comment:
   Hmm.. I probably should pass new segment info to be thread safe. Let me update the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1122296076


##########
pinot-broker/pom.xml:
##########
@@ -153,5 +153,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>

Review Comment:
   Do we need this dependency? I feel we only need to mock `millis()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1137544792


##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -617,7 +617,8 @@ protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tabl
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          return getCurrentCountStarResult(tableName) == countStarResult;
+          long curCount = getCurrentCountStarResult(tableName);

Review Comment:
   done



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -113,18 +114,27 @@ public static class Helix {
     public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
 
     public static class StateModel {
+      // public only for testing purpose.
+      public static final long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);
       public static class SegmentStateModel {
         public static final String ONLINE = "ONLINE";
         public static final String OFFLINE = "OFFLINE";
         public static final String ERROR = "ERROR";
         public static final String CONSUMING = "CONSUMING";
+        public static boolean isOnline(String state) {
+          return state.equals(ONLINE) || state.equals(CONSUMING);
+        }
       }
 
       public static class BrokerResourceStateModel {
         public static final String ONLINE = "ONLINE";
         public static final String OFFLINE = "OFFLINE";
         public static final String ERROR = "ERROR";
       }
+
+      public static boolean isNewSegment(long creationTimeMillis, long nowMillis) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #10350: [bugfix] Fix unavailable instances issues for StrictReplicaGroup

Posted by "61yao (via GitHub)" <gi...@apache.org>.
61yao commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1144063961


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,162 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
- * Base implementation of instance selector which maintains a map from segment to enabled ONLINE/CONSUMING server
+ * Base implementation of instance selector. Selector maintains a map from segment to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no enabled instance or all enabled instances are
  * in ERROR state).
+ * <p>
+ * Special handling of new segment: It is common for new segment to be partially available or not available at all in
+ * all instances.
+ * 1) We don't report new segment as unavailable segments.
+ * 2) To increase query availability, unavailable
+ * instance for new segment won't be excluded for instance selection. When it is selected, we don't serve the new
+ * segment.
+ * <p>
+ * Definition of new segment:
+ * 1) Segment created more than 5 minutes ago.
+ * - If we first see a segment via initialization, we look up segment creation time from zookeeper.
+ * - If we first see a segment via onAssignmentChange initialization, we use the calling time of onAssignmentChange
+ * as approximation.
+ * 2) We retire new segment as old when:
+ * - The creation time is more than 5 minutes ago
+ * - Any instance for new segment is in error state
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistent selection of new segments across queries. (some queries will serve new segments and others won't)
+ * 2) When there is no state update from helix, new segments won't be retired because of the time passing.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org