You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/02 18:59:37 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6778: upload missing LLC segment to segment store by controller periodic task

mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r644165186



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +152,16 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  // Map caching the LLC segment names without deep store download uri. Controller gets the LLC segment names from this map, and asks servers to upload the segments to segment store. This helps to alleviates excessive ZK access when fetching LLC segment list.

Review comment:
       nit: line too long. Please fix all your comment lines that are extra-long. Maybe you need to update your ide settings?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store download url. The time range check is based on segment name. This step helps to alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers.
+   * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;
+        }
+        // delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixSegmentStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose segment store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(String.format("Failed to upload segment %s to segment store because no online replica is found", segmentName));
+        }
+
+        // Randomly ask one server to upload
+        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
+        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
+        LOGGER.info("Ask server to upload LLC segment {} to segment store by this path: {}", segmentName, serverUploadRequestUrl);
+        String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
+        // Update segment ZK metadata by adding the download URL
+        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, stat.getVersion());
+        LOGGER.info("Successfully uploaded LLC segment {} to segment store with download url: {}", segmentName, segmentDownloadUrl);
+      } catch (Exception e) {
+        segmentsNotFixed.offer(segmentName);
+        _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L);
+        LOGGER.error("Failed to upload segment {} to segment store", segmentName, e);

Review comment:
       If a segment was present in the queue, but the retention manager deleted it, we will be retrying that for a long time. We should make sure that retention manager also removes segments from queue as they are retained out.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
    * The segment will be eligible for repairs by the validation manager, if the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
+  /**

Review comment:
       Nit: Can you match the comment style like in line 129 to 135? (asterisk on each new line). thanks

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -157,6 +181,21 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
     }
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _isUploadingRealtimeMissingSegmentStoreCopyEnabled = controllerConf.isUploadingRealtimeMissingSegmentStoreCopyEnabled();
+    if (_isUploadingRealtimeMissingSegmentStoreCopyEnabled) {
+      _fileUploadDownloadClient = initFileUploadDownloadClient();
+      _llcSegmentMapForUpload = new ConcurrentHashMap<>();
+      _validationRangeForLLCSegmentsDeepStoreCopyMs = (long)controllerConf.getValidationRangeInDaysToCheckMissingSegmentStoreCopy() * 24 * 3600 * 1000;

Review comment:
       Use `TimeUnit` class to convert to millis, thanks

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
##########
@@ -49,7 +49,8 @@
   NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
-  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false);
+  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+  NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR("llcSegmentDeepStoreUploadFixError", true);

Review comment:
       Please make this a gauge. You can set the gauge to the number of segments in `_llcSegmentMapForUpload`.  It is best not to make it a per-table value, since all tables are likely to have these errors. 

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
    * The segment will be eligible for repairs by the validation manager, if the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
+  /**
+   * Controller waits this amount of time before asking servers to upload LLC segments without deep store copy. The reason is after step 1 of segment completion is done (segment ZK metadata status changed to be DONE), servers are still in the process of loading segments. Only after that segments are in ONLINE status in external view for the controller to discover.

Review comment:
       ```suggestion
      * Controller waits this amount of time before asking servers to upload LLC segments without deep store copy. 
      * The reason is after step 1 of segment completion is done (segment ZK metadata status changed to be 
      * DONE), servers may be  still in the process of loading segments. Only after that segments are in ONLINE 
      * status in external view for the controller to discover.
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", segmentName);

Review comment:
       This could be bad, right? If we are not able to fetch segment metadata here, that segment will be left blank in deepstore for a long time...until next restart or mastership shift. I think this needs a metric bumped.
   ```suggestion
             LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName);
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {

Review comment:
       i would prefer a method name like `isOlderThan` or `isNewerThan`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store download url. The time range check is based on segment name. This step helps to alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers.
+   * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}: all segments are available in segment store.", realtimeTableName);

Review comment:
       In a cluster with 1000s of tables, we will keep seeing this message. Remove this log.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {

Review comment:
       Should check for leadership for table here. If not leader, then remove the segments from the queue

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store download url. The time range check is based on segment name. This step helps to alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers.
+   * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {

Review comment:
       A segment will not make it to the queue until the status is DONE. Why do we have this check?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store download url. The time range check is based on segment name. This step helps to alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < _validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add segment store download uri in ZK. Since uploading to segment store involves expensive compression step (first tar up the segment and then upload), we don't want to retry the uploading. Segment without segment store copy can still be downloaded from peer servers.
+   * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table {}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to segment store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;

Review comment:
       Log a msg here and at every point below as needed.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourc
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
   }
 
+  @Override
+  protected void setUpTask() {
+    // Prefetch the LLC segment without segment store copy from ZK, which helps to alleviate ZK access.

Review comment:
       Are you sure that the controller leadership for a table is established by this time? We should do this in a callback from LeadControllerManager. And yes, this needs to be coded up. There is no callback right now, afaik.
   
   @jackjlli  can help.

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Controller periodic task uses this endpoint to ask servers to upload committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)

Review comment:
       +1, I agree with Jackie. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org