You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/11/11 19:11:16 UTC

[pinot] branch master updated: Preprocess immutable segments from REALTIME table conditionally when loading them (#9772)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cd1a017deb Preprocess immutable segments from REALTIME table conditionally when loading them (#9772)
cd1a017deb is described below

commit cd1a017debf95e088881b191319c521b2a3a4296
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:11:07 2022 -0800

    Preprocess immutable segments from REALTIME table conditionally when loading them (#9772)
    
    The immutable segments from OFFLINE table are reprocessed only when they are not consistent with the TableConfig or schema any more, like need to update index types, thus avoiding redundant processing. But the immutable segments from REALTIME table are reprocessed unconditionally today.
    
    This PR reuses the util methods added for OFFLINE table to make both consistent when loading immutable segments. Meanwhile, also make the management of temp data files/folders consistent between both type of tables.
---
 .../core/data/manager/BaseTableDataManager.java    |  13 +-
 .../manager/realtime/RealtimeTableDataManager.java |  84 +++++-----
 .../data/manager/BaseTableDataManagerTest.java     | 181 ++++++++-------------
 .../data/manager/TableDataManagerTestUtils.java    | 107 ++++++++++++
 .../realtime/RealtimeTableDataManagerTest.java     | 172 ++++++++++++++++++++
 5 files changed, 391 insertions(+), 166 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index b5f990416b..6b37af6c8b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -474,7 +474,6 @@ public abstract class BaseTableDataManager implements TableDataManager {
   private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata)
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
-    FileUtils.forceMkdir(tempRootDir);
     if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
       try {
         File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir,
@@ -622,8 +621,14 @@ public abstract class BaseTableDataManager implements TableDataManager {
   }
 
   @VisibleForTesting
-  protected File getTmpSegmentDataDir(String segmentName) {
-    return new File(_resourceTmpDir, segmentName);
+  protected File getTmpSegmentDataDir(String segmentName)
+      throws IOException {
+    File tmpDir = new File(_resourceTmpDir, segmentName);
+    if (tmpDir.exists()) {
+      FileUtils.deleteQuietly(tmpDir);
+    }
+    FileUtils.forceMkdir(tmpDir);
+    return tmpDir;
   }
 
   /**
@@ -674,7 +679,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
    * object may be created when trying to load the segment, but it's closed if the method
    * returns false; otherwise it's opened and to be referred by ImmutableSegment object.
    */
-  private boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+  protected boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
     // Try to recover the segment from potential segment reloading failure.
     String segmentTier = zkMetadata.getTier();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5c3fc7ce04..2c616c438c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -295,25 +295,21 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
     boolean isHLCSegment = SegmentName.isHighLevelConsumerSegmentName(segmentName);
     if (segmentZKMetadata.getStatus().isCompleted()) {
-      if (segmentDir.exists()) {
-        // Local segment exists, try to load it
-        try {
-          addSegment(ImmutableSegmentLoader.load(segmentDir, indexLoadingConfig, schema));
-          return;
-        } catch (Exception e) {
-          if (!isHLCSegment) {
-            // For LLC and uploaded segments, delete the local copy and download a new copy
-            _logger.error("Caught exception while loading segment: {}, downloading a new copy", segmentName, e);
-            FileUtils.deleteQuietly(segmentDir);
-          } else {
-            // For HLC segments, throw out the exception because there is no way to recover (controller does not have a
-            // copy of the segment)
-            throw new RuntimeException("Failed to load local HLC segment: " + segmentName, e);
-          }
-        }
+      if (isHLCSegment && !segmentDir.exists()) {
+        throw new RuntimeException("Failed to find local copy for committed HLC segment: " + segmentName);
+      }
+      if (tryLoadExistingSegment(segmentName, indexLoadingConfig, segmentZKMetadata)) {
+        // The existing completed segment has been loaded successfully
+        return;
       } else {
-        if (isHLCSegment) {
-          throw new RuntimeException("Failed to find local copy for committed HLC segment: " + segmentName);
+        if (!isHLCSegment) {
+          // For LLC and uploaded segments, delete the local copy and download a new copy
+          _logger.error("Failed to load LLC segment: {}, downloading a new copy", segmentName);
+          FileUtils.deleteQuietly(segmentDir);
+        } else {
+          // For HLC segments, throw out the exception because there is no way to recover (controller does not have a
+          // copy of the segment)
+          throw new RuntimeException("Failed to load local HLC segment: " + segmentName);
         }
       }
       // Local segment doesn't exist or cannot load, download a new copy
@@ -441,6 +437,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     String uri = segmentZKMetadata.getDownloadUrl();
     if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
       try {
+        // TODO: cleanup and consolidate the segment loading logic a bit for OFFLINE and REALTIME tables.
+        //       https://github.com/apache/pinot/issues/9752
         downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
       } catch (Exception e) {
         _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e);
@@ -463,46 +461,38 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   }
 
   private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) {
-    File segmentTarFile = new File(_indexDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    if (segmentTarFile.exists()) {
-      _logger.warn(
-          "Segment tar file: {} already exists (possibly due to server restart/crash resulting in skipped cleanup). "
-              + "Deleting it before fetching again from uri: {}",
-          segmentName, uri);
-      FileUtils.deleteQuietly(segmentTarFile);
-    }
+    // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted.
+    // This is fine since the temporary directories are deleted when the table data manager calls init.
+    File tempRootDir = null;
     try {
+      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis());
+      File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, segmentTarFile,
           segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir);
     } catch (Exception e) {
       _logger.warn("Failed to download segment {} from deep store: ", segmentName, e);
       throw new RuntimeException(e);
     } finally {
-      FileUtils.deleteQuietly(segmentTarFile);
+      FileUtils.deleteQuietly(tempRootDir);
     }
   }
 
   /**
    * Untars the new segment and replaces the existing segment.
    */
-  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile)
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile,
+      File tempRootDir)
       throws IOException {
-    // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted.
-    // This is fine since the temporary directories are deleted when the table data manager calls init.
-    File tempSegmentDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis());
-    try {
-      File tempIndexDir = TarGzCompressionUtils.untar(segmentTarFile, tempSegmentDir).get(0);
-      _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, tempSegmentDir);
-      File indexDir = new File(_indexDir, segmentName);
-      FileUtils.deleteQuietly(indexDir);
-      FileUtils.moveDirectory(tempIndexDir, indexDir);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
-    } finally {
-      FileUtils.deleteQuietly(tempSegmentDir);
-    }
+    File untarDir = new File(tempRootDir, segmentName);
+    File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, untarDir).get(0);
+    _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, untarDir);
+    File indexDir = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(indexDir);
+    FileUtils.moveDirectory(untaredSegDir, indexDir);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
   }
 
   private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
@@ -514,20 +504,22 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   private void downloadSegmentFromPeer(String segmentName, String downloadScheme,
       IndexLoadingConfig indexLoadingConfig) {
-    File segmentTarFile = new File(_indexDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempRootDir = null;
     try {
+      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis());
+      File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       // First find servers hosting the segment in a ONLINE state.
       List<URI> peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager);
       // Next download the segment from a randomly chosen server using configured scheme.
       SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(peerSegmentURIs, segmentTarFile);
       _logger.info("Fetched segment {} from: {} to: {} of size: {}", segmentName, peerSegmentURIs, segmentTarFile,
           segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir);
     } catch (Exception e) {
       _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, downloadScheme, e);
       throw new RuntimeException(e);
     } finally {
-      FileUtils.deleteQuietly(segmentTarFile);
+      FileUtils.deleteQuietly(tempRootDir);
     }
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index a0d2bee629..62b55df352 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.core.data.manager;
 
-import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,6 +35,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -49,7 +48,6 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
@@ -61,7 +59,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.util.TestUtils;
@@ -69,9 +66,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY;
-import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY;
-import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -94,7 +88,7 @@ public class BaseTableDataManagerTest {
   public void setUp()
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
-    initSegmentFetcher();
+    TableDataManagerTestUtils.initSegmentFetcher();
   }
 
   @AfterMethod
@@ -117,7 +111,7 @@ public class BaseTableDataManagerTest {
 
     BaseTableDataManager tmgr = createTableManager();
     assertFalse(tmgr.getSegmentDataDir(segName).exists());
-    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, false);
+    tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, false);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -130,7 +124,7 @@ public class BaseTableDataManagerTest {
     String segName = "seg01";
     SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, SegmentVersion.v3, 5);
     String tierName = "coolTier";
-    when(zkmd.getTier()).thenReturn(tierName);
+    zkmd.setTier(tierName);
 
     // Mock the case where segment is loaded but its CRC is different from
     // the one in zk, thus raw segment is downloaded and loaded.
@@ -141,7 +135,8 @@ public class BaseTableDataManagerTest {
     BaseTableDataManager tmgr = createTableManager();
     File defaultSegDir = tmgr.getSegmentDataDir(segName);
     assertFalse(defaultSegDir.exists());
-    tmgr.reloadSegment(segName, createIndexLoadingConfig("tierBased", tableConfig), zkmd, llmd, null, false);
+    tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null),
+        zkmd, llmd, null, false);
     assertTrue(defaultSegDir.exists());
     llmd = new SegmentMetadataImpl(defaultSegDir);
     assertEquals(llmd.getTotalDocs(), 5);
@@ -151,7 +146,7 @@ public class BaseTableDataManagerTest {
     when(llmd.getCrc()).thenReturn("0");
     tableConfig = createTableConfigWithTier(tierName, new File(TEMP_DIR, tierName));
     tmgr = createTableManager();
-    IndexLoadingConfig loadingCfg = createIndexLoadingConfig("tierBased", tableConfig);
+    IndexLoadingConfig loadingCfg = TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null);
     tmgr.reloadSegment(segName, loadingCfg, zkmd, llmd, null, false);
     File segDirOnTier = tmgr.getSegmentDataDir(segName, tierName, loadingCfg.getTableConfig());
     assertTrue(segDirOnTier.exists());
@@ -167,23 +162,23 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v1);
 
     // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn(segCrc);
+    when(llmd.getCrc()).thenReturn(Long.toString(segCrc));
 
     BaseTableDataManager tmgr = createTableManager();
-    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, false);
+    tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, false);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
 
     FileUtils.deleteQuietly(localSegDir);
     try {
-      tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, false);
+      tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, false);
       fail();
     } catch (Exception e) {
       // As expected, segment reloading fails due to missing the local segment dir.
@@ -197,19 +192,20 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v1);
 
     // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     String tierName = "coolTier";
     when(zkmd.getTier()).thenReturn(tierName);
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn(segCrc);
+    when(llmd.getCrc()).thenReturn(Long.toString(segCrc));
 
     // No dataDir for coolTier, thus stay on default tier.
     BaseTableDataManager tmgr = createTableManager();
-    tmgr.reloadSegment(segName, createIndexLoadingConfig("tierBased", tableConfig), zkmd, llmd, null, false);
+    tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null),
+        zkmd, llmd, null, false);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -217,10 +213,10 @@ public class BaseTableDataManagerTest {
 
     // Configured dataDir for coolTier, thus move to new dir.
     llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn(segCrc);
+    when(llmd.getCrc()).thenReturn(Long.toString(segCrc));
     tableConfig = createTableConfigWithTier(tierName, new File(TEMP_DIR, tierName));
     tmgr = createTableManager();
-    IndexLoadingConfig loadingCfg = createIndexLoadingConfig("tierBased", tableConfig);
+    IndexLoadingConfig loadingCfg = TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null);
     tmgr.reloadSegment(segName, loadingCfg, zkmd, llmd, null, false);
     File segDirOnTier = tmgr.getSegmentDataDir(segName, tierName, loadingCfg.getTableConfig());
     assertTrue(segDirOnTier.exists());
@@ -236,16 +232,16 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v1);
 
     // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn(segCrc);
+    when(llmd.getCrc()).thenReturn(Long.toString(segCrc));
 
     // Require to use v3 format.
-    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    IndexLoadingConfig idxCfg = TableDataManagerTestUtils.createIndexLoadingConfig();
     idxCfg.setSegmentVersion(SegmentVersion.v3);
 
     BaseTableDataManager tmgr = createTableManager();
@@ -262,18 +258,18 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
     assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN, SegmentVersion.v3));
     assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
 
     // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn(segCrc);
+    when(llmd.getCrc()).thenReturn(Long.toString(segCrc));
 
     // Require to add indices.
-    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    IndexLoadingConfig idxCfg = TableDataManagerTestUtils.createIndexLoadingConfig();
     idxCfg.setSegmentVersion(SegmentVersion.v3);
     idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN, LONG_COLUMN)));
 
@@ -291,8 +287,9 @@ public class BaseTableDataManagerTest {
       throws Exception {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
-    SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, SegmentVersion.v3, 5);
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
+    SegmentZKMetadata zkmd = TableDataManagerTestUtils.makeRawSegment(segName, localSegDir,
+        new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
 
     // Same CRC but force to download.
     BaseTableDataManager tmgr = createTableManager();
@@ -302,14 +299,14 @@ public class BaseTableDataManagerTest {
     // Remove the local segment dir. Segment reloading fails unless force to download.
     FileUtils.deleteQuietly(localSegDir);
     try {
-      tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, false);
+      tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, false);
       fail();
     } catch (Exception e) {
       // As expected, segment reloading fails due to missing the local segment dir.
       assertTrue(e.getMessage().contains("does not exist or is not a directory"));
     }
 
-    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, true);
+    tmgr.reloadSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, true);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
 
     llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
@@ -331,7 +328,7 @@ public class BaseTableDataManagerTest {
 
     BaseTableDataManager tmgr = createTableManager();
     assertFalse(tmgr.getSegmentDataDir(segName).exists());
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, llmd);
+    tmgr.addOrReplaceSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -344,7 +341,7 @@ public class BaseTableDataManagerTest {
     String segName = "seg01";
     SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, SegmentVersion.v3, 5);
     String tierName = "coolTier";
-    when(zkmd.getTier()).thenReturn(tierName);
+    zkmd.setTier(tierName);
 
     // Mock the case where segment is loaded but its CRC is different from
     // the one in zk, thus raw segment is downloaded and loaded.
@@ -355,7 +352,8 @@ public class BaseTableDataManagerTest {
     BaseTableDataManager tmgr = createTableManager();
     File defaultSegDir = tmgr.getSegmentDataDir(segName);
     assertFalse(defaultSegDir.exists());
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig("tierBased", tableConfig), zkmd, llmd);
+    tmgr.addOrReplaceSegment(segName,
+        TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null), zkmd, llmd);
     assertTrue(defaultSegDir.exists());
     llmd = new SegmentMetadataImpl(defaultSegDir);
     assertEquals(llmd.getTotalDocs(), 5);
@@ -365,7 +363,7 @@ public class BaseTableDataManagerTest {
     when(llmd.getCrc()).thenReturn("0");
     tableConfig = createTableConfigWithTier(tierName, new File(TEMP_DIR, tierName));
     tmgr = createTableManager();
-    IndexLoadingConfig loadingCfg = createIndexLoadingConfig("tierBased", tableConfig);
+    IndexLoadingConfig loadingCfg = TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null);
     tmgr.addOrReplaceSegment(segName, loadingCfg, zkmd, llmd);
     File segDirOnTier = tmgr.getSegmentDataDir(segName, tierName, loadingCfg.getTableConfig());
     assertTrue(segDirOnTier.exists());
@@ -386,7 +384,7 @@ public class BaseTableDataManagerTest {
 
     BaseTableDataManager tmgr = createTableManager();
     assertFalse(tmgr.getSegmentDataDir("seg01").exists());
-    tmgr.addOrReplaceSegment("seg01", createIndexLoadingConfig(), zkmd, llmd);
+    tmgr.addOrReplaceSegment("seg01", TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd);
     // As CRC is same, the index dir is left as is, so not get created by the test.
     assertFalse(tmgr.getSegmentDataDir("seg01").exists());
   }
@@ -397,22 +395,22 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     when(zkmd.getDownloadUrl()).thenReturn("file://somewhere");
 
     BaseTableDataManager tmgr = createTableManager();
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    tmgr.addOrReplaceSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, null);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     SegmentMetadataImpl llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
 
     FileUtils.deleteQuietly(localSegDir);
     try {
-      tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+      tmgr.addOrReplaceSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, null);
       fail();
     } catch (Exception e) {
       // As expected, when local segment dir is missing, it tries to download
@@ -427,18 +425,19 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
     when(zkmd.getDownloadUrl()).thenReturn("file://somewhere");
     String tierName = "coolTier";
     when(zkmd.getTier()).thenReturn(tierName);
 
     // No dataDir for coolTier, thus stay on default tier.
     BaseTableDataManager tmgr = createTableManager();
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig("tierBased", tableConfig), zkmd, null);
+    tmgr.addOrReplaceSegment(segName,
+        TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null), zkmd, null);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     SegmentMetadataImpl llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -446,7 +445,7 @@ public class BaseTableDataManagerTest {
 
     // Configured dataDir for coolTier, thus move to new dir.
     tableConfig = createTableConfigWithTier(tierName, new File(TEMP_DIR, tierName));
-    IndexLoadingConfig loadingCfg = createIndexLoadingConfig("tierBased", tableConfig);
+    IndexLoadingConfig loadingCfg = TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, null);
     File segDirOnTier = tmgr.getSegmentDataDir(segName, tierName, loadingCfg.getTableConfig());
     assertFalse(segDirOnTier.exists());
     // Move segDir to new tier to see if addOrReplaceSegment() can load segDir from new tier directly.
@@ -464,18 +463,18 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
 
     BaseTableDataManager tmgr = createTableManager();
     File backup = tmgr.getSegmentDataDir(segName + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
     localSegDir.renameTo(backup);
 
     assertFalse(tmgr.getSegmentDataDir(segName).exists());
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    tmgr.addOrReplaceSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, null);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     SegmentMetadataImpl llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -496,7 +495,7 @@ public class BaseTableDataManagerTest {
     localSegDir.renameTo(backup);
 
     assertFalse(tmgr.getSegmentDataDir(segName).exists());
-    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    tmgr.addOrReplaceSegment(segName, TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, null);
     assertTrue(tmgr.getSegmentDataDir(segName).exists());
     SegmentMetadataImpl llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
     assertEquals(llmd.getTotalDocs(), 5);
@@ -508,14 +507,14 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v1);
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
 
     // Require to use v3 format.
-    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    IndexLoadingConfig idxCfg = TableDataManagerTestUtils.createIndexLoadingConfig();
     idxCfg.setSegmentVersion(SegmentVersion.v3);
 
     BaseTableDataManager tmgr = createTableManager();
@@ -532,14 +531,14 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
 
     // Require to use v1 format.
-    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    IndexLoadingConfig idxCfg = TableDataManagerTestUtils.createIndexLoadingConfig();
     idxCfg.setSegmentVersion(SegmentVersion.v1);
 
     BaseTableDataManager tmgr = createTableManager();
@@ -557,16 +556,16 @@ public class BaseTableDataManagerTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     String segName = "seg01";
     File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 5);
-    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
     assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN, SegmentVersion.v3));
     assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
 
     // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getCrc()).thenReturn(segCrc);
 
     // Require to add indices.
-    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    IndexLoadingConfig idxCfg = TableDataManagerTestUtils.createIndexLoadingConfig();
     idxCfg.setSegmentVersion(SegmentVersion.v3);
     idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN, LONG_COLUMN)));
 
@@ -606,7 +605,7 @@ public class BaseTableDataManagerTest {
     try {
       // Set maxRetry to 0 to cause retry failure immediately.
       Map<String, Object> properties = new HashMap<>();
-      properties.put(RETRY_COUNT_CONFIG_KEY, 0);
+      properties.put(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 0);
       SegmentFetcherFactory.init(new PinotConfiguration(properties));
       tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
       fail();
@@ -661,36 +660,6 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  private static void initSegmentFetcher()
-      throws Exception {
-    Map<String, Object> properties = new HashMap<>();
-    properties.put(RETRY_COUNT_CONFIG_KEY, 3);
-    properties.put(RETRY_WAIT_MS_CONFIG_KEY, 100);
-    properties.put(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 5);
-    SegmentFetcherFactory.init(new PinotConfiguration(properties));
-
-    // Setup crypter
-    properties.put("class.fakePinotCrypter", FakePinotCrypter.class.getName());
-    PinotCrypterFactory.init(new PinotConfiguration(properties));
-  }
-
-  private static IndexLoadingConfig createIndexLoadingConfig() {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    return indexLoadingConfig;
-  }
-
-  private static IndexLoadingConfig createIndexLoadingConfig(String segDirLoader, TableConfig tableConfig) {
-    InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
-    when(idmc.getSegmentDirectoryLoader()).thenReturn(segDirLoader);
-    when(idmc.getConfig()).thenReturn(new PinotConfiguration());
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(idmc, tableConfig);
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    return indexLoadingConfig;
-  }
-
   private static BaseTableDataManager createTableManager() {
     TableDataManagerConfig config = mock(TableDataManagerConfig.class);
     when(config.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
@@ -708,17 +677,9 @@ public class BaseTableDataManagerTest {
   private static SegmentZKMetadata createRawSegment(TableConfig tableConfig, String segName, SegmentVersion segVer,
       int rowCnt)
       throws Exception {
-    File segDir = createSegment(tableConfig, segName, segVer, rowCnt);
-    String segCrc = getCRC(segDir, SegmentVersion.v3);
-
-    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    File tempTar = new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    TarGzCompressionUtils.createTarGzFile(segDir, tempTar);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
-
-    FileUtils.deleteQuietly(segDir);
-    return zkmd;
+    File localSegDir = createSegment(tableConfig, segName, segVer, rowCnt);
+    return TableDataManagerTestUtils.makeRawSegment(segName, localSegDir,
+        new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
   }
 
   private static File createSegment(TableConfig tableConfig, String segName, SegmentVersion segVer, int rowCnt)
@@ -742,18 +703,6 @@ public class BaseTableDataManagerTest {
     return new File(TABLE_DATA_DIR, segName);
   }
 
-  private static String getCRC(File segDir, SegmentVersion segVer)
-      throws IOException {
-    File parentDir = segDir;
-    if (segVer == SegmentVersion.v3) {
-      parentDir = new File(segDir, "v3");
-    }
-    File crcFile = new File(parentDir, V1Constants.SEGMENT_CREATION_META);
-    try (DataInputStream ds = new DataInputStream(new FileInputStream(crcFile))) {
-      return String.valueOf(ds.readLong());
-    }
-  }
-
   private static boolean hasInvertedIndex(File segDir, String colName, SegmentVersion segVer)
       throws IOException {
     File parentDir = segDir;
@@ -765,8 +714,8 @@ public class BaseTableDataManagerTest {
   }
 
   private TableConfig createTableConfigWithTier(String tierName, File dataDir) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Collections
-        .singletonList(new TierConfig(tierName, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "3d", null,
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
+        Collections.singletonList(new TierConfig(tierName, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "3d", null,
             TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null,
             Collections.singletonMap("dataDir", dataDir.getAbsolutePath())))).build();
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TableDataManagerTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TableDataManagerTestUtils.java
new file mode 100644
index 0000000000..510e6c1232
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TableDataManagerTestUtils.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TableDataManagerTestUtils {
+  private TableDataManagerTestUtils() {
+  }
+
+  public static long getCRC(File segDir, SegmentVersion segVer)
+      throws IOException {
+    File parentDir = segDir;
+    if (segVer == SegmentVersion.v3) {
+      parentDir = new File(segDir, "v3");
+    }
+    File crcFile = new File(parentDir, V1Constants.SEGMENT_CREATION_META);
+    try (DataInputStream ds = new DataInputStream(new FileInputStream(crcFile))) {
+      return ds.readLong();
+    }
+  }
+
+  public static SegmentZKMetadata makeRawSegment(String segName, File localSegDir, File rawSegDir,
+      boolean deleteLocalSegDir)
+      throws Exception {
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
+    SegmentZKMetadata zkmd = new SegmentZKMetadata(segName);
+    TarGzCompressionUtils.createTarGzFile(localSegDir, rawSegDir);
+    zkmd.setDownloadUrl("file://" + rawSegDir.getAbsolutePath());
+    zkmd.setCrc(segCrc);
+    if (deleteLocalSegDir) {
+      FileUtils.deleteQuietly(localSegDir);
+    }
+    return zkmd;
+  }
+
+  public static void initSegmentFetcher()
+      throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+    properties.put(BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY, 100);
+    properties.put(BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 5);
+    SegmentFetcherFactory.init(new PinotConfiguration(properties));
+
+    // Setup crypter
+    properties.put("class.fakePinotCrypter", BaseTableDataManagerTest.FakePinotCrypter.class.getName());
+    PinotCrypterFactory.init(new PinotConfiguration(properties));
+  }
+
+  public static IndexLoadingConfig createIndexLoadingConfig(String segDirLoader, TableConfig tableConfig,
+      @Nullable Schema schema) {
+    InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
+    when(idmc.getSegmentDirectoryLoader()).thenReturn(segDirLoader);
+    when(idmc.getConfig()).thenReturn(new PinotConfiguration());
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(idmc, tableConfig, schema);
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    return indexLoadingConfig;
+  }
+
+  public static IndexLoadingConfig createIndexLoadingConfig() {
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    return indexLoadingConfig;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index e2afd3c9c1..f9eeb6a536 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -18,19 +18,145 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SchemaUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.data.manager.TableDataManagerTestUtils;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
 public class RealtimeTableDataManagerTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "RealtimeTableDataManagerTest");
+  private static final String TABLE_NAME = "table01";
+  private static final String TABLE_NAME_WITH_TYPE = "table01_REALTIME";
+  private static final File TABLE_DATA_DIR = new File(TEMP_DIR, TABLE_NAME_WITH_TYPE);
+  private static final String STRING_COLUMN = "col1";
+  private static final String[] STRING_VALUES = {"A", "D", "E", "B", "C"};
+  private static final String LONG_COLUMN = "col2";
+  private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L, 30000L};
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+    TableDataManagerTestUtils.initSegmentFetcher();
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+
+  @Test
+  public void testAddSegmentUseBackupCopy()
+      throws Exception {
+    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+    TableDataManagerConfig tableDataManagerConfig = createTableDataManagerConfig();
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    TableConfig tableConfig = setupTableConfig(propertyStore);
+    Schema schema = setupSchema(propertyStore);
+    tmgr.init(tableDataManagerConfig, "server01", propertyStore,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new TableDataManagerParams(0, false, -1));
+
+    // Create a dummy local segment.
+    String segName = "seg01";
+    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segName);
+    segmentZKMetadata.setStatus(Status.DONE);
+    File localSegDir = createSegment(tableConfig, schema, segName);
+    long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3);
+    segmentZKMetadata.setCrc(segCrc);
+    when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segName), null,
+        AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord());
+
+    // Move the segment to the backup location.
+    File backup = new File(TABLE_DATA_DIR, segName + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    localSegDir.renameTo(backup);
+    assertEquals(localSegDir, new File(TABLE_DATA_DIR, segName));
+    assertFalse(localSegDir.exists());
+    IndexLoadingConfig indexLoadingConfig =
+        TableDataManagerTestUtils.createIndexLoadingConfig("default", tableConfig, schema);
+    tmgr.addSegment(segName, tableConfig, indexLoadingConfig);
+    // Segment data is put back the default location, and backup location is deleted.
+    assertTrue(localSegDir.exists());
+    assertFalse(backup.exists());
+    SegmentMetadataImpl llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
+  @Test
+  public void testAddSegmentNoBackupCopy()
+      throws Exception {
+    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+    TableDataManagerConfig tableDataManagerConfig = createTableDataManagerConfig();
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    TableConfig tableConfig = setupTableConfig(propertyStore);
+    Schema schema = setupSchema(propertyStore);
+    tmgr.init(tableDataManagerConfig, "server01", propertyStore,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new TableDataManagerParams(0, false, -1));
+
+    // Create a raw segment and put it in deep store backed by local fs.
+    String segName = "seg01";
+    SegmentZKMetadata segmentZKMetadata =
+        TableDataManagerTestUtils.makeRawSegment(segName, createSegment(tableConfig, schema, segName),
+            new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
+    segmentZKMetadata.setStatus(Status.DONE);
+    when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segName), null,
+        AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord());
+
+    // Local segment dir doesn't exist, thus downloading from deep store.
+    File localSegDir = new File(TABLE_DATA_DIR, segName);
+    assertFalse(localSegDir.exists());
+    IndexLoadingConfig indexLoadingConfig =
+        TableDataManagerTestUtils.createIndexLoadingConfig("default", tableConfig, schema);
+    tmgr.addSegment(segName, tableConfig, indexLoadingConfig);
+    // Segment data is put on default location.
+    assertTrue(localSegDir.exists());
+    SegmentMetadataImpl llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
   @Test
   public void testAllowDownload() {
     RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
@@ -53,4 +179,50 @@ public class RealtimeTableDataManagerTest {
     when(zkmd.getDownloadUrl()).thenReturn("remote");
     assertTrue(mgr.allowDownload(llc.getSegmentName(), zkmd));
   }
+
+  private static File createSegment(TableConfig tableConfig, Schema schema, String segName)
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
+    config.setSegmentName(segName);
+    config.setSegmentVersion(SegmentVersion.v3);
+    List<GenericRow> rows = new ArrayList<>(3);
+    for (int i = 0; i < STRING_VALUES.length; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(STRING_COLUMN, STRING_VALUES[i]);
+      row.putValue(LONG_COLUMN, LONG_VALUES[i]);
+      rows.add(row);
+    }
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+    return new File(TABLE_DATA_DIR, segName);
+  }
+
+  private static TableDataManagerConfig createTableDataManagerConfig() {
+    TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
+    when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
+    when(tableDataManagerConfig.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
+    return tableDataManagerConfig;
+  }
+
+  private static TableConfig setupTableConfig(ZkHelixPropertyStore propertyStore)
+      throws Exception {
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setSchemaName(TABLE_NAME).build();
+    ZNRecord tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
+    when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(TABLE_NAME_WITH_TYPE), null,
+        AccessOption.PERSISTENT)).thenReturn(tableConfigZNRecord);
+    return tableConfig;
+  }
+
+  private static Schema setupSchema(ZkHelixPropertyStore propertyStore) {
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+        .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build();
+    ZNRecord schemaZNRecord = SchemaUtils.toZNRecord(schema);
+    when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSchema(TABLE_NAME), null,
+        AccessOption.PERSISTENT)).thenReturn(schemaZNRecord);
+    return schema;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org