You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/09/03 16:00:38 UTC

[GitHub] [pinot] yupeng9 commented on a change in pull request #6778: upload missing LLC segment to segment store by controller periodic task

yupeng9 commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r702006431



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1327,167 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access.
+          // Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+            continue;
+          }
+
+          SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without deep store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE
+              && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addMeteredTableValue(tableNameWithType,
+              ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {

Review comment:
       who calls this?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1327,167 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access.
+          // Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+            continue;
+          }
+
+          SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without deep store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE
+              && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addMeteredTableValue(tableNameWithType,
+              ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    while (!segmentQueue.isEmpty()) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      String segmentName = segmentQueue.poll();
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} which is created before deep store upload retry time range",
+              segmentName);
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // If the download url is already fixed, skip the fix for this segment.
+        if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} whose deep store download url is already available",
+              segmentName);
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);
+          continue;
+        }
+        // Delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(
+              String.format(
+                  "Failed to upload segment %s to deep store because no online replica is found",
+                  segmentName));
+        }
+
+        // Randomly ask one server to upload
+        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
+        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
+        LOGGER.info(
+            "Ask server to upload LLC segment {} to deep store by this path: {}",
+            segmentName, serverUploadRequestUrl);
+        String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
+        // Update segment ZK metadata by adding the download URL
+        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, stat.getVersion());
+        LOGGER.info(
+            "Successfully uploaded LLC segment {} to deep store with download url: {}",
+            segmentName, segmentDownloadUrl);
+      } catch (Exception e) {
+        segmentsNotFixed.offer(segmentName);
+        _controllerMetrics.addMeteredTableValue(realtimeTableName,
+            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+        LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);

Review comment:
       nit: there could be two types of failures: server fails to upload, or updating zk fails. It would be helpful to distinguish these two

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1327,167 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access.
+          // Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+            continue;
+          }
+
+          SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without deep store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE
+              && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addMeteredTableValue(tableNameWithType,
+              ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    while (!segmentQueue.isEmpty()) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      String segmentName = segmentQueue.poll();
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} which is created before deep store upload retry time range",
+              segmentName);
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // If the download url is already fixed, skip the fix for this segment.
+        if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} whose deep store download url is already available",
+              segmentName);
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);
+          continue;
+        }
+        // Delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(

Review comment:
       why not log error and proceed to the next item in the queue

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -160,6 +201,23 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
     }
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _isDeepStoreLLCSegmentUploadRetryEnabled =
+        controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+    if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
+      _fileUploadDownloadClient = initFileUploadDownloadClient();

Review comment:
       @npawar is there a plan to replace `FileUploadDownloadClient` with `SegmentUploader`?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1228,4 +1301,146 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access. Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _validationRangeForLLCSegmentsDeepStoreCopyMs) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);

Review comment:
       SG. Also we need a TODO to check and upload a specific table in an on-demand way.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1327,167 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String tableNameWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          // Only fetch recently created LLC segment to alleviate ZK access.
+          // Validate segment creation time from segment name.
+          LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+          if (currentTimeMs - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+            continue;
+          }
+
+          SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without deep store download url
+          if (segmentZKMetadata.getStatus() == Status.DONE
+              && CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addMeteredTableValue(tableNameWithType,
+              ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", segmentName);
+        }
+      }
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    Queue<String> segmentQueue = _llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    while (!segmentQueue.isEmpty()) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      String segmentName = segmentQueue.poll();
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} which is created before deep store upload retry time range",
+              segmentName);
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // If the download url is already fixed, skip the fix for this segment.
+        if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          LOGGER.info(
+              "Skipped fixing LLC segment {} whose deep store download url is already available",
+              segmentName);
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);
+          continue;
+        }
+        // Delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(

Review comment:
       will this crash the thread?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org