You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/16 23:28:08 UTC

[pinot] branch master updated: upload missing LLC segment to segment store by controller periodic task (#6778)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6605bc0  upload missing LLC segment to segment store by controller periodic task (#6778)
6605bc0 is described below

commit 6605bc0aee6082654603cdc5c39ad148a2712e81
Author: Chang <33...@users.noreply.github.com>
AuthorDate: Thu Sep 16 16:27:52 2021 -0700

    upload missing LLC segment to segment store by controller periodic task (#6778)
    
    With the new LLC segment commit protocol: deep store bypass for realtime segment completion, LLC segments can be considered as committed with segment store upload failure. Using controller periodic tasks (RealtimeSegmentValidationManager) to fix the upload failure by asking servers to upload missing segments.
---
 .../pinot/common/metrics/ControllerMeter.java      |   4 +-
 .../common/utils/FileUploadDownloadClient.java     |  29 ++++
 .../apache/pinot/common/utils/LLCSegmentName.java  |  13 +-
 .../pinot/common/utils/SegmentNameBuilderTest.java |   3 +
 .../apache/pinot/controller/ControllerConf.java    |   9 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 137 ++++++++++++++++++
 .../RealtimeSegmentValidationManager.java          |  26 ++--
 .../PinotLLCRealtimeSegmentManagerTest.java        | 157 +++++++++++++++++++++
 8 files changed, 364 insertions(+), 14 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 8d1c60f..55f1948 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -49,7 +49,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
-  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false);
+  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+  LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false);
+
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index c39d752..b185b9e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -797,6 +797,35 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
+   * Used by controllers to send requests to servers:
+   * Controller periodic task uses this endpoint to ask servers
+   * to upload committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   *
+   * TODO: migrate this method to another class
+   */
+  public String uploadToSegmentStore(String uri)
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    RequestBuilder requestBuilder = RequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DEFAULT_SOCKET_TIMEOUT_MS);
+    // sendRequest checks the response status code
+    SimpleHttpResponse response = sendRequest(requestBuilder.build());
+    String downloadUrl = response.getResponse();
+    if (downloadUrl.isEmpty()) {
+      throw new HttpErrorStatusException(
+          String.format(
+              "Returned segment download url is empty after requesting servers to upload by the path: %s",
+              uri),
+          response.getStatusCode());
+    }
+    return downloadUrl;
+  }
+
+  /**
    * Send segment uri.
    *
    * Note: table name has to be set as a parameter.
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index a66bb3c..4bae626 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -20,11 +20,14 @@ package org.apache.pinot.common.utils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 
 public class LLCSegmentName extends SegmentName implements Comparable {
   private final static String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
+  private final static DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+
   private final String _tableName;
   private final int _partitionGroupId;
   private final int _sequenceNumber;
@@ -52,8 +55,7 @@ public class LLCSegmentName extends SegmentName implements Comparable {
     _partitionGroupId = partitionGroupId;
     _sequenceNumber = sequenceNumber;
     // ISO8601 date: 20160120T1234Z
-    DateTime dateTime = new DateTime(msSinceEpoch, DateTimeZone.UTC);
-    _creationTime = dateTime.toString(DATE_FORMAT);
+    _creationTime = DATE_FORMATTER.print(msSinceEpoch);
     _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
   }
 
@@ -93,6 +95,11 @@ public class LLCSegmentName extends SegmentName implements Comparable {
     return _creationTime;
   }
 
+  public long getCreationTimeMs() {
+    DateTime dateTime = DATE_FORMATTER.parseDateTime(_creationTime);
+    return dateTime.getMillis();
+  }
+
   @Override
   public String getSegmentName() {
     return _segmentName;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
index 12dcf84..0fc15bc 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
@@ -123,12 +123,14 @@ public class SegmentNameBuilderTest {
     final int sequenceNumber = 27;
     final long msSinceEpoch = 1466200248000L;
     final String creationTime = "20160617T2150Z";
+    final long creationTimeInMs = 1466200200000L;
     final String segmentName = "myTable__4__27__" + creationTime;
 
     LLCSegmentName segName1 = new LLCSegmentName(tableName, partitionGroupId, sequenceNumber, msSinceEpoch);
     Assert.assertEquals(segName1.getSegmentName(), segmentName);
     Assert.assertEquals(segName1.getPartitionGroupId(), partitionGroupId);
     Assert.assertEquals(segName1.getCreationTime(), creationTime);
+    Assert.assertEquals(segName1.getCreationTimeMs(), creationTimeInMs);
     Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber);
     Assert.assertEquals(segName1.getTableName(), tableName);
 
@@ -136,6 +138,7 @@ public class SegmentNameBuilderTest {
     Assert.assertEquals(segName2.getSegmentName(), segmentName);
     Assert.assertEquals(segName2.getPartitionGroupId(), partitionGroupId);
     Assert.assertEquals(segName2.getCreationTime(), creationTime);
+    Assert.assertEquals(segName2.getCreationTimeMs(), creationTimeInMs);
     Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber);
     Assert.assertEquals(segName2.getTableName(), tableName);
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index fc688d5..77bf264 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -173,6 +173,11 @@ public class ControllerConf extends PinotConfiguration {
     public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
         "controller.segmentRelocator.initialDelayInSeconds";
 
+    // The flag to indicate if controller periodic job will fix the missing LLC segment deep store copy.
+    // Default value is false.
+    public static final String ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT =
+        "controller.realtime.segment.deepStoreUploadRetryEnabled";
+
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
 
@@ -758,6 +763,10 @@ public class ControllerConf extends PinotConfiguration {
         getPeriodicTaskInitialDelayInSeconds());
   }
 
+  public boolean isDeepStoreRetryUploadLLCSegmentEnabled() {
+    return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
+  }
+
   public long getPinotTaskManagerInitialDelaySeconds() {
     return getPeriodicTaskInitialDelayInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 849552d..6e72249 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -20,12 +20,14 @@ package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -48,8 +50,10 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
@@ -62,7 +66,10 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
+import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
+import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -70,6 +77,7 @@ import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -105,7 +113,10 @@ import org.slf4j.LoggerFactory;
  *   <li>commitSegmentMetadata(): From lead controller only</li>
  *   <li>segmentStoppedConsuming(): From lead controller only</li>
  *   <li>ensureAllPartitionsConsuming(): From lead controller only</li>
+ *   <li>uploadToDeepStoreIfMissing(): From lead controller only</li>
  * </ul>
+ *
+ * TODO: migrate code in this class to other places for better readability
  */
 public class PinotLLCRealtimeSegmentManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
@@ -125,6 +136,15 @@ public class PinotLLCRealtimeSegmentManager {
    * The segment will be eligible for repairs by the validation manager, if the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
+  /**
+   * When controller asks server to upload missing LLC segment copy to deep store, it could happen that the segment
+   * retention is short time away, and RetentionManager walks in to purge the segment. To avoid this data racing issue,
+   * check the segment expiration time to see if it is about to be deleted (i.e. less than this threshold). Skip the
+   * deep store fix if necessary. RetentionManager will delete this kind of segments shortly anyway.
+   */
+  private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS =
+      60 * 60 * 1000L; // 1 hour
+  private static final Random RANDOM = new Random();
 
   private final HelixAdmin _helixAdmin;
   private final HelixManager _helixManager;
@@ -138,9 +158,11 @@ public class PinotLLCRealtimeSegmentManager {
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics) {
@@ -160,6 +182,20 @@ public class PinotLLCRealtimeSegmentManager {
     }
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _isDeepStoreLLCSegmentUploadRetryEnabled =
+        controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+    if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
+      _fileUploadDownloadClient = initFileUploadDownloadClient();
+    }
+  }
+
+  public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
+    return _isDeepStoreLLCSegmentUploadRetryEnabled;
+  }
+
+  @VisibleForTesting
+  FileUploadDownloadClient initFileUploadDownloadClient() {
+    return new FileUploadDownloadClient();
   }
 
   public boolean getIsSplitCommitEnabled() {
@@ -234,6 +270,14 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
+
+    if (_fileUploadDownloadClient != null) {
+      try {
+        _fileUploadDownloadClient.close();
+      } catch (IOException e) {
+        LOGGER.error("Failed to close fileUploadDownloadClient.");
+      }
+    }
   }
 
   /**
@@ -1243,4 +1287,97 @@ public class PinotLLCRealtimeSegmentManager {
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   *
+   * TODO: Add an on-demand way to upload LLC segment to deep store for a specific table.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMetadata> segmentsZKMetadata) {
+    String realtimeTableName = tableConfig.getTableName();
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    // Use this retention value to avoid the data racing between segment upload and retention management.
+    RetentionStrategy retentionStrategy = null;
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    if (validationConfig.getRetentionTimeUnit() != null && !validationConfig.getRetentionTimeUnit().isEmpty()
+        && validationConfig.getRetentionTimeValue() != null && !validationConfig.getRetentionTimeValue().isEmpty()) {
+      long retentionMs =
+          TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
+              .toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
+      retentionStrategy = new TimeRetentionStrategy(
+          TimeUnit.MILLISECONDS,
+          retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+    }
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      String segmentName = segmentZKMetadata.getSegmentName();
+      try {
+        if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+          continue;
+        }
+        // Only fix the committed / externally uploaded llc segment without deep store copy
+        if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS
+            || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
+              segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(
+              String.format(
+                  "Failed to upload segment %s to deep store because no online replica is found",
+                  segmentName));
+        }
+
+        // Randomly ask one server to upload
+        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
+        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
+        LOGGER.info(
+            "Ask server to upload LLC segment {} to deep store by this path: {}",
+            segmentName, serverUploadRequestUrl);
+        String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
+        // Update segment ZK metadata by adding the download URL
+        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+        // TODO: add version check when persist segment ZK metadata
+        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
+        LOGGER.info(
+            "Successfully uploaded LLC segment {} to deep store with download url: {}",
+            segmentName, segmentDownloadUrl);
+      } catch (Exception e) {
+        _controllerMetrics.addMeteredTableValue(realtimeTableName,
+            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+        LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
+      }
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index e42f2a7..53291b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -43,7 +43,8 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming
+ * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming,
+ * and uploading segments to deep store if segment download url is missing in the metadata.
  */
 public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<RealtimeSegmentValidationManager.Context> {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
@@ -52,7 +53,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   private final ValidationMetrics _validationMetrics;
 
   private final int _segmentLevelValidationIntervalInSeconds;
-  private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+  private long _lastSegmentLevelValidationRunTimeMs = 0L;
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -70,13 +71,13 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   @Override
   protected Context preprocess() {
     Context context = new Context();
-    // Update realtime document counts only if certain time has passed after previous run
+    // Run segment level validation only if certain time has passed after previous run
     long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs)
+    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs)
         >= _segmentLevelValidationIntervalInSeconds) {
       LOGGER.info("Run segment-level validation");
-      context._updateRealtimeDocumentCount = true;
-      _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+      context._runSegmentLevelValidation = true;
+      _lastSegmentLevelValidationRunTimeMs = currentTimeMs;
     }
     return context;
   }
@@ -92,8 +93,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
         return;
       }
 
-      if (context._updateRealtimeDocumentCount) {
-        updateRealtimeDocumentCount(tableConfig);
+      if (context._runSegmentLevelValidation) {
+        runSegmentLevelValidation(tableConfig);
       }
 
       PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
@@ -104,7 +105,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
     }
   }
 
-  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+  private void runSegmentLevelValidation(TableConfig tableConfig) {
     String realtimeTableName = tableConfig.getTableName();
     List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
     boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
@@ -116,6 +117,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
     // Update the gauge to contain the total document count in the segments
     _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
         computeRealtimeTotalDocumentInSegments(segmentsZKMetadata, countHLCSegments));
+
+    if (streamConfig.hasLowLevelConsumerType()
+        && _llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
+      _llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata);
+    }
   }
 
   @VisibleForTesting
@@ -157,7 +163,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   }
 
   public static final class Context {
-    private boolean _updateRealtimeDocumentCount;
+    private boolean _runSegmentLevelValidation;
   }
 
   @VisibleForTesting
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 537a010..d5a2186 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,13 +35,23 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import javax.annotation.Nullable;
+import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -50,6 +61,7 @@ import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -73,6 +85,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
@@ -912,6 +925,138 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
   }
 
+  /**
+   * Test cases for fixing LLC segment by uploading to segment store if missing
+   */
+  @Test
+  public void testUploadToSegmentStore()
+      throws HttpErrorStatusException, IOException, URISyntaxException {
+    // mock the behavior for PinotHelixResourceManager
+    PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    HelixManager helixManager = mock(HelixManager.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+    when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixManager.getClusterName()).thenReturn("cluster_name");
+    when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+    // init fake PinotLLCRealtimeSegmentManager
+    ControllerConf controllerConfig = new ControllerConf();
+    controllerConfig.setProperty(
+        ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig);
+    Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled());
+
+    // Set up a new table with 2 replicas, 5 instances, 5 partition.
+    setUpNewTable(segmentManager, 2, 5, 5);
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+        new SegmentsValidationAndRetentionConfig();
+    segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString());
+    segmentsValidationAndRetentionConfig.setRetentionTimeValue("3");
+    segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
+    List<SegmentZKMetadata> segmentsZKMetadata =
+        new ArrayList<>(segmentManager._segmentZKMetadataMap.values());
+    Assert.assertEquals(segmentsZKMetadata.size(), 5);
+
+    // Set up external view for this table
+    ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+    when(helixAdmin.getResourceExternalView("cluster_name", REALTIME_TABLE_NAME))
+        .thenReturn(externalView);
+    when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new ArrayList<>());
+    String adminPort = "2077";
+    Map<String, String> instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, adminPort);
+    when(helixAdmin.getConfig(any(HelixConfigScope.class), any(List.class))).thenReturn(instanceConfigMap);
+
+    // Change 1st segment status to be DONE, but with default peer download url.
+    // Verify later the download url is fixed after upload success.
+    segmentsZKMetadata.get(0).setStatus(Status.DONE);
+    segmentsZKMetadata.get(0).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    // set up the external view for 1st segment
+    String instance0 = "instance0";
+    externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), instance0, "ONLINE");
+    InstanceConfig instanceConfig0 = new InstanceConfig(instance0);
+    instanceConfig0.setHostName(instance0);
+    when(helixAdmin.getInstanceConfig(any(String.class), eq(instance0))).thenReturn(instanceConfig0);
+    // mock the request/response for 1st segment upload
+    String serverUploadRequestUrl0 = StringUtil
+        .join("/",
+            CommonConstants.HTTP_PROTOCOL + "://" + instance0 + ":" + adminPort,
+            "segments",
+            REALTIME_TABLE_NAME,
+            segmentsZKMetadata.get(0).getSegmentName(),
+            "upload");
+    String segmentDownloadUrl0 = String.format("segmentDownloadUr_%s", segmentsZKMetadata.get(0)
+        .getSegmentName());
+    when(segmentManager._mockedFileUploadDownloadClient
+        .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(segmentDownloadUrl0);
+
+    // Change 2nd segment status to be DONE, but with default peer download url.
+    // Verify later the download url isn't fixed after upload failure.
+    segmentsZKMetadata.get(1).setStatus(Status.DONE);
+    segmentsZKMetadata.get(1).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    // set up the external view for 2nd segment
+    String instance1 = "instance1";
+    externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), instance1, "ONLINE");
+    InstanceConfig instanceConfig1 = new InstanceConfig(instance1);
+    instanceConfig1.setHostName(instance1);
+    when(helixAdmin.getInstanceConfig(any(String.class), eq(instance1))).thenReturn(instanceConfig1);
+    // mock the request/response for 2nd segment upload
+    String serverUploadRequestUrl1 = StringUtil
+        .join("/",
+            CommonConstants.HTTP_PROTOCOL + "://" + instance1 + ":" + adminPort,
+            "segments",
+            REALTIME_TABLE_NAME,
+            segmentsZKMetadata.get(1).getSegmentName(),
+            "upload");
+    when(segmentManager._mockedFileUploadDownloadClient
+        .uploadToSegmentStore(serverUploadRequestUrl1))
+        .thenThrow(new HttpErrorStatusException(
+            "failed to upload segment", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
+
+    // Change 3rd segment status to be DONE, but with default peer download url.
+    // Verify later the download url isn't fixed because no ONLINE replica found in any server.
+    segmentsZKMetadata.get(2).setStatus(Status.DONE);
+    segmentsZKMetadata.get(2).setDownloadUrl(
+        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    // set up the external view for 3rd segment
+    String instance2 = "instance2";
+    externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), instance2, "OFFLINE");
+
+    // Change 4th segment status to be DONE and with segment download url.
+    // Verify later the download url is still the same.
+    String defaultDownloadUrl = "canItBeDownloaded";
+    segmentsZKMetadata.get(3).setStatus(Status.DONE);
+    segmentsZKMetadata.get(3).setDownloadUrl(defaultDownloadUrl);
+
+    // Keep 5th segment status as IN_PROGRESS.
+
+    List<String> segmentNames = segmentsZKMetadata.stream()
+        .map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
+    when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
+        .thenReturn(segmentManager._tableConfig);
+
+    // Verify the result
+    segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
+    assertEquals(
+        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
+        segmentDownloadUrl0);
+    assertEquals(
+        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(),
+        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    assertEquals(
+        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(),
+        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    assertEquals(
+        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(),
+        defaultDownloadUrl);
+    assertNull(
+        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl());
+  }
+
   //////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   /////////////////////////////////////////////////////////////////////////////////
@@ -934,11 +1079,16 @@ public class PinotLLCRealtimeSegmentManagerTest {
     int _numPartitions;
     List<PartitionGroupMetadata> _partitionGroupMetadataList = null;
     boolean _exceededMaxSegmentCompletionTime = false;
+    FileUploadDownloadClient _mockedFileUploadDownloadClient;
 
     FakePinotLLCRealtimeSegmentManager() {
       super(mock(PinotHelixResourceManager.class), CONTROLLER_CONF, mock(ControllerMetrics.class));
     }
 
+    FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+      super(pinotHelixResourceManager, config, mock(ControllerMetrics.class));
+    }
+
     void makeTableConfig() {
       Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
       _tableConfig =
@@ -968,6 +1118,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
+    FileUploadDownloadClient initFileUploadDownloadClient() {
+      FileUploadDownloadClient fileUploadDownloadClient = mock(FileUploadDownloadClient.class);
+      _mockedFileUploadDownloadClient = fileUploadDownloadClient;
+      return fileUploadDownloadClient;
+    }
+
+    @Override
     public TableConfig getTableConfig(String realtimeTableName) {
       return _tableConfig;
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org