You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/09/14 00:10:59 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7319: Move segment download and segment reload into TableManager

Jackie-Jiang commented on a change in pull request #7319:
URL: https://github.com/apache/pinot/pull/7319#discussion_r707781438



##########
File path: pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.HLCSegmentName;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class RealtimeTableDataManagerTest {
+  @Test
+  public void testAllowDownload() {
+    RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
+
+    final String groupId = "myTable_REALTIME_1234567_0";

Review comment:
       (code format) we don't usually put final in local variable

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
##########
@@ -259,53 +255,50 @@ private void reloadSegment(String tableNameWithType, SegmentMetadata segmentMeta
       return;
     }
 
-    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
-    File parentFile = indexDir.getParentFile();
-    File segmentBackupDir =
-        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
+    Preconditions.checkNotNull(zkMetadata);
 
     // This method might modify the file on disk. Use segment lock to prevent race condition
     Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName);
     try {
       segmentLock.lock();
 
-      // First rename index directory to segment backup directory so that original segment have all file descriptors
-      // point to the segment backup directory to ensure original segment serves queries properly
-
-      // Rename index directory to segment backup directory (atomic)
-      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
-          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+      // Reloads an existing segment, and the local segment metadata is existing as asserted above.
+      tableDataManager
+          .reloadSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), zkMetadata,
+              segmentMetadata, schema, forceDownload);
+      LOGGER.info("Reloaded segment: {} of table: {}", segmentName, tableNameWithType);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
 
-      // Copy from segment backup directory back to index directory
-      FileUtils.copyDirectory(segmentBackupDir, indexDir);
+  @Override
+  public void addOrReplaceSegment(String tableNameWithType, String segmentName)
+      throws Exception {
+    LOGGER.info("Adding or replacing segment: {} for table: {}", segmentName, tableNameWithType);
 
-      // Load from index directory
-      ImmutableSegment immutableSegment = ImmutableSegmentLoader
-          .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), schema);
+    // Get updated table config and segment metadata from Zookeeper.
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);

Review comment:
       Ideally, these should also be pushed down to the table data manager. But seems we need the `_instanceDataManagerConfig` to create the `IndexLoadingConfig`, and pushing the logic down requires more refactoring.
   Maybe adding a TODO for future references?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This method tries to recover from reload
+   * failure before checking the existence of the index directory and loading segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", segmentName, _tableNameWithType,
+          localMetadata.getCrc());
+      return localMetadata;
+    } catch (Exception e) {
+      LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      FileUtils.deleteQuietly(indexDir);
+      return null;
+    }
+  }
+
+  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig indexLoadingConfig) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      addSegment(indexDir, indexLoadingConfig);
+      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, _tableNameWithType);
+      return true;
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(indexDir);
+      LOGGER.error("Failed to load segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      return false;
+    }
+  }
+
+  private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
+    FileUtils.forceMkdir(tempRootDir);
+    try {
+      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+    } finally {
+      FileUtils.deleteQuietly(tempRootDir);
+    }
+  }
+
+  @VisibleForTesting
+  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir)
+      throws Exception {
+    File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName());
+      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+          _tableNameWithType, uri, tarFile, tarFile.length());
+      return tarFile;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
+          _tableNameWithType, uri, tarFile);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+      Utils.rethrowException(e);
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir) {
+    File untarDir = new File(tempRootDir, segmentName);
+    try {
+      // If an exception is thrown when untarring, it means the tar file is broken
+      // or not found after the retry. Thus, there's no need to retry again.
+      File untaredSegDir = TarGzCompressionUtils.untar(tarFile, untarDir).get(0);
+      LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile, untarDir);
+      // Replace the existing index directory.
+      File indexDir = getSegmentDataDir(segmentName);
+      FileUtils.deleteDirectory(indexDir);
+      FileUtils.moveDirectory(untaredSegDir, indexDir);
+      LOGGER.info("Successfully downloaded segment: {} of table: {} to index dir: {}", segmentName, _tableNameWithType,
+          indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}", segmentName, _tableNameWithType, tarFile,
+          untarDir);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UNTAR_FAILURES, 1L);
+      Utils.rethrowException(e);

Review comment:
       Also add `throws Exception` in the method signature
   ```suggestion
         throw e;
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",

Review comment:
       Avoid creating another copy of segment metadata here (`new SegmentMetadataImpl(indexDir).getCrc()`). Local crc should always match remote crc here.
   Also, the segment might not be replaced, but newly added

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;

Review comment:
       Add some comments on why we set the local metadata to `null` here

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This method tries to recover from reload
+   * failure before checking the existence of the index directory and loading segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", segmentName, _tableNameWithType,
+          localMetadata.getCrc());
+      return localMetadata;
+    } catch (Exception e) {
+      LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      FileUtils.deleteQuietly(indexDir);
+      return null;
+    }
+  }
+
+  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig indexLoadingConfig) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      addSegment(indexDir, indexLoadingConfig);
+      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, _tableNameWithType);
+      return true;
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(indexDir);
+      LOGGER.error("Failed to load segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      return false;
+    }
+  }
+
+  private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
+    FileUtils.forceMkdir(tempRootDir);
+    try {
+      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+    } finally {
+      FileUtils.deleteQuietly(tempRootDir);
+    }
+  }
+
+  @VisibleForTesting
+  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir)
+      throws Exception {
+    File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName());
+      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+          _tableNameWithType, uri, tarFile, tarFile.length());
+      return tarFile;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
+          _tableNameWithType, uri, tarFile);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+      Utils.rethrowException(e);

Review comment:
       ```suggestion
         throw e;
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)

Review comment:
       `schema` is nullable

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,

Review comment:
       (nit)
   ```suggestion
             LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}", segmentName,
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This method tries to recover from reload
+   * failure before checking the existence of the index directory and loading segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", segmentName, _tableNameWithType,

Review comment:
       This info is misleading. The segment might not be recovered, but simply not loaded into memory yet.

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
##########
@@ -78,6 +81,27 @@ void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
   void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig)
       throws Exception;
 
+  /**
+   * Reloads an existing immutable segment for the table, which can be an OFFLINE or REALTIME table.
+   * A new segment may be downloaded if the local one has a different CRC; or can be forced to download
+   * if forceDownload flag is true. This operation is conducted within a failure handling framework
+   * and made transparent to ongoing queries, because the segment is in online serving state.
+   */
+  void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)

Review comment:
       `schema` is nullable?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This method tries to recover from reload
+   * failure before checking the existence of the index directory and loading segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", segmentName, _tableNameWithType,
+          localMetadata.getCrc());
+      return localMetadata;
+    } catch (Exception e) {
+      LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      FileUtils.deleteQuietly(indexDir);
+      return null;
+    }
+  }
+
+  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig indexLoadingConfig) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      addSegment(indexDir, indexLoadingConfig);
+      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, _tableNameWithType);
+      return true;
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(indexDir);
+      LOGGER.error("Failed to load segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      return false;
+    }
+  }
+
+  private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
+    FileUtils.forceMkdir(tempRootDir);
+    try {
+      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+    } finally {
+      FileUtils.deleteQuietly(tempRootDir);
+    }
+  }
+
+  @VisibleForTesting
+  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir)
+      throws Exception {
+    File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName());
+      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+          _tableNameWithType, uri, tarFile, tarFile.length());
+      return tarFile;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
+          _tableNameWithType, uri, tarFile);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+      Utils.rethrowException(e);
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir) {
+    File untarDir = new File(tempRootDir, segmentName);
+    try {
+      // If an exception is thrown when untarring, it means the tar file is broken
+      // or not found after the retry. Thus, there's no need to retry again.
+      File untaredSegDir = TarGzCompressionUtils.untar(tarFile, untarDir).get(0);
+      LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile, untarDir);
+      // Replace the existing index directory.
+      File indexDir = getSegmentDataDir(segmentName);
+      FileUtils.deleteDirectory(indexDir);
+      FileUtils.moveDirectory(untaredSegDir, indexDir);
+      LOGGER.info("Successfully downloaded segment: {} of table: {} to index dir: {}", segmentName, _tableNameWithType,
+          indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}", segmentName, _tableNameWithType, tarFile,
+          untarDir);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UNTAR_FAILURES, 1L);
+      Utils.rethrowException(e);
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  File getSegmentDataDir(String segmentName) {
+    return new File(_indexDir, segmentName);
+  }
+
+  @VisibleForTesting
+  static boolean isNewSegment(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) {

Review comment:
       Annotate `localMetadata` as nullable

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +270,219 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, Schema schema, boolean forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc:{} mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+
+      // Rename segment backup directory to segment temporary directory (atomic)
+      // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with local crc now becomes: {} and remote crc: {}",
+        segmentName, _tableNameWithType, new SegmentMetadataImpl(indexDir).getCrc(), zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This method tries to recover from reload
+   * failure before checking the existence of the index directory and loading segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Recovered segment: {} of table: {} with crc: {} from disk", segmentName, _tableNameWithType,
+          localMetadata.getCrc());
+      return localMetadata;
+    } catch (Exception e) {
+      LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      FileUtils.deleteQuietly(indexDir);
+      return null;
+    }
+  }
+
+  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig indexLoadingConfig) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      addSegment(indexDir, indexLoadingConfig);
+      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, _tableNameWithType);
+      return true;
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(indexDir);
+      LOGGER.error("Failed to load segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
+      return false;
+    }
+  }
+
+  private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata)
+      throws Exception {
+    File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
+    FileUtils.forceMkdir(tempRootDir);
+    try {
+      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+    } finally {
+      FileUtils.deleteQuietly(tempRootDir);
+    }
+  }
+
+  @VisibleForTesting
+  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir)
+      throws Exception {
+    File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName());
+      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+          _tableNameWithType, uri, tarFile, tarFile.length());
+      return tarFile;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
+          _tableNameWithType, uri, tarFile);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+      Utils.rethrowException(e);
+    }
+    return null;

Review comment:
       Remove this line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org