You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/06/08 03:55:37 UTC

[pinot] branch master updated: Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e5952d75 Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)
26e5952d75 is described below

commit 26e5952d75ff5accedd05c5eb9812cf900d3f413
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Jun 7 20:55:29 2023 -0700

    Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)
---
 .../controller/helix/ControllerRequestClient.java  |  10 ++
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../apache/pinot/core/minion/SegmentPurger.java    |   9 +-
 .../pinot/core/minion/SegmentPurgerTest.java       |   7 +-
 .../tests/PurgeMinionClusterIntegrationTest.java   | 151 +++++++++++++++------
 .../minion/tasks/purge/PurgeTaskExecutor.java      |   7 +-
 .../minion/tasks/purge/PurgeTaskExecutorTest.java  |   3 +
 7 files changed, 140 insertions(+), 52 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 2da40ef3e5..e6aa83dea1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -81,6 +81,16 @@ public class ControllerRequestClient {
     }
   }
 
+  public void updateSchema(Schema schema)
+      throws IOException {
+    String url = _controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName());
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendMultipartPutRequest(url, schema.toSingleLineJsonString()));
+    } catch (HttpErrorStatusException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void deleteSchema(String schemaName)
       throws IOException {
     String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 41b34a4911..63f575293d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -622,6 +622,11 @@ public class ControllerTest {
     getControllerRequestClient().addSchema(schema);
   }
 
+  public void updateSchema(Schema schema)
+      throws IOException {
+    getControllerRequestClient().updateSchema(schema);
+  }
+
   public Schema getSchema(String schemaName) {
     Schema schema = _helixResourceManager.getSchema(schemaName);
     assertNotNull(schema);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 2b65508585..4094b6b6e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -45,19 +46,21 @@ public class SegmentPurger {
   private final File _indexDir;
   private final File _workingDir;
   private final TableConfig _tableConfig;
+  private final Schema _schema;
   private final RecordPurger _recordPurger;
   private final RecordModifier _recordModifier;
 
   private int _numRecordsPurged;
   private int _numRecordsModified;
 
-  public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, @Nullable RecordPurger recordPurger,
-      @Nullable RecordModifier recordModifier) {
+  public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Schema schema,
+      @Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier) {
     Preconditions.checkArgument(recordPurger != null || recordModifier != null,
         "At least one of record purger and modifier should be non-null");
     _indexDir = indexDir;
     _workingDir = workingDir;
     _tableConfig = tableConfig;
+    _schema = schema;
     _recordPurger = recordPurger;
     _recordModifier = recordModifier;
   }
@@ -79,7 +82,7 @@ public class SegmentPurger {
         return null;
       }
 
-      SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, segmentMetadata.getSchema());
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
       config.setOutDir(_workingDir.getPath());
       config.setSegmentName(segmentName);
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index d29c0d94e3..2dd6b37a3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -67,6 +67,7 @@ public class SegmentPurgerTest {
   private static final String D2 = "d2";
 
   private TableConfig _tableConfig;
+  private Schema _schema;
   private File _originalIndexDir;
   private int _expectedNumRecordsPurged;
   private int _expectedNumRecordsModified;
@@ -79,7 +80,7 @@ public class SegmentPurgerTest {
     _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true)
         .build();
-    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
+    _schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
         .addSingleValueDimension(D2, FieldSpec.DataType.INT).build();
 
     List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
@@ -98,7 +99,7 @@ public class SegmentPurgerTest {
     }
     GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows);
 
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
     config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
     config.setSegmentName(SEGMENT_NAME);
 
@@ -125,7 +126,7 @@ public class SegmentPurgerTest {
     };
 
     SegmentPurger segmentPurger =
-        new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier);
+        new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier);
     File purgedIndexDir = segmentPurger.purgeSegment();
 
     // Check the purge/modify counter in segment purger
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 284e7655b5..c4ba131f6d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,8 +36,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -56,25 +61,21 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
   private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
   private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
   private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
+  private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
+
 
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
   protected PinotTaskManager _taskManager;
   protected PinotHelixResourceManager _pinotHelixResourceManager;
   protected String _tableName;
 
-  protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
-  protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
-  protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
-
-  protected final File _tarDir1 = new File(_tempDir, "tarDir1");
-  protected final File _tarDir2 = new File(_tempDir, "tarDir2");
-  protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir");
+  protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1, _segmentDir2, _tarDir2, _segmentDir3,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, _segmentTarDir);
 
     // Start the Pinot cluster
     startZk();
@@ -82,37 +83,38 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     startBrokers(1);
     startServers(1);
 
-    // Create and upload the schema and table config
-    Schema schema = createSchema();
-    addSchema(schema);
-    setTableName(PURGE_DELTA_NOT_PASSED_TABLE);
-    TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig();
-    purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
-    setTableName(PURGE_FIRST_RUN_TABLE);
-    TableConfig purgeTableConfig = createOfflineTableConfig();
-    purgeTableConfig.setTaskConfig(getPurgeTaskConfig());
-
-    setTableName(PURGE_DELTA_PASSED_TABLE);
-    TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig();
-    purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
-
-    addTableConfig(purgeTableConfig);
-    addTableConfig(purgeDeltaPassedTableConfig);
-    addTableConfig(purgeDeltaNotPassedTableConfig);
+    List<String> allTables = ImmutableList.of(
+        PURGE_FIRST_RUN_TABLE,
+        PURGE_DELTA_PASSED_TABLE,
+        PURGE_DELTA_NOT_PASSED_TABLE,
+        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+    );
+    Schema schema = null;
+    TableConfig tableConfig = null;
+    for (String tableName : allTables) {
+      // create and upload schema
+      schema = createSchema();
+      schema.setSchemaName(tableName);
+      addSchema(schema);
+
+      // create and upload table config
+      setTableName(tableName);
+      tableConfig = createOfflineTableConfig();
+      tableConfig.setTaskConfig(getPurgeTaskConfig());
+      addTableConfig(tableConfig);
+    }
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, _segmentDir1, _tarDir1);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig, schema, 0, _segmentDir2,
-        _tarDir2);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig, schema, 0,
-        _segmentDir3, _tarDir3);
+    // Create segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDataDir,
+        _segmentTarDir);
 
-    uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1);
-    uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2);
-    uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3);
+    // Upload segments for all tables
+    for (String tableName : allTables) {
+      uploadSegments(tableName, _segmentTarDir);
+    }
 
     startMinion();
     setRecordPurger();
@@ -150,10 +152,14 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
   private void setRecordPurger() {
     MinionContext minionContext = MinionContext.getInstance();
     minionContext.setRecordPurgerFactory(rawTableName -> {
-      List<String> tableNames =
-          Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE);
+      List<String> tableNames = Arrays.asList(
+          PURGE_FIRST_RUN_TABLE,
+          PURGE_DELTA_PASSED_TABLE,
+          PURGE_DELTA_NOT_PASSED_TABLE,
+          PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+      );
       if (tableNames.contains(rawTableName)) {
-        return row -> row.getValue("Quarter").equals(1);
+        return row -> row.getValue("ArrTime").equals(1);
       } else {
         return null;
       }
@@ -205,11 +211,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
     assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
 
-    // 28057 rows with quarter = 1
+    // 52 rows with ArrTime = 1
     // 115545 totals rows
-    // Expecting 115545 - 28057 = 87488 rows after purging
+    // Expecting 115545 - 52 = 115493 rows after purging
     // It might take some time for server to load the purged segments
-    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 87488, 60_000L,
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 115493, 60_000L,
         "Failed to get expected purged records");
 
     // Drop the table
@@ -251,11 +257,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
     assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
 
-    // 28057 rows with quarter = 1
+    // 52 rows with ArrTime = 1
     // 115545 totals rows
-    // Expecting 115545 - 28057 = 87488 rows after purging
+    // Expecting 115545 - 52 = 115493 rows after purging
     // It might take some time for server to load the purged segments
-    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 87488, 60_000L,
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 115493, 60_000L,
         "Failed to get expected purged records");
 
     // Drop the table
@@ -302,6 +308,63 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     verifyTableDelete(offlineTableName);
   }
 
+  /**
+   * Test purge on segments which were built by older schema and table config.
+   * Two new columns are added after segments are built and indices are defined for the new columns in the table config.
+   */
+  @Test
+  public void testPurgeOnOldSegmentsWithIndicesOnNewColumns()
+      throws Exception {
+
+    // add new columns to schema
+    Schema schema = createSchema();
+    schema.addField(new DimensionFieldSpec("ColumnABC", FieldSpec.DataType.INT, true));
+    schema.addField(new DimensionFieldSpec("ColumnXYZ", FieldSpec.DataType.INT, true));
+    schema.setSchemaName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    updateSchema(schema);
+
+    // add indices to the new columns
+    setTableName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    TableConfig tableConfig = createOfflineTableConfig();
+    tableConfig.setTaskConfig(getPurgeTaskConfig());
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    List<String> invertedIndices = new ArrayList<>(indexingConfig.getInvertedIndexColumns());
+    invertedIndices.add("ColumnABC");
+    List<String> rangeIndices = new ArrayList<>(indexingConfig.getRangeIndexColumns());
+    rangeIndices.add("ColumnXYZ");
+    indexingConfig.setInvertedIndexColumns(invertedIndices);
+    indexingConfig.setRangeIndexColumns(rangeIndices);
+    updateTableConfig(tableConfig);
+
+    // schedule purge tasks
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // Check that metadata contains expected values
+    for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Check purge time
+      assertTrue(
+          metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
+    }
+
+    // 52 rows with ArrTime = 1
+    // 115545 totals rows
+    // Expecting 115545 - 52 = 115493 rows after purging
+    // It might take some time for server to load the purged segments
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE) == 115493,
+        60_000L, "Failed to get expected purged records");
+
+    // Drop the table
+    dropOfflineTable(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
   protected void verifyTableDelete(String tableNameWithType) {
     TestUtils.waitForCondition(input -> {
       // Check if the segment lineage is cleaned up
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
index 542bbde7da..267ed0874e 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.minion.SegmentPurger;
 import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -44,7 +45,6 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
 
-    TableConfig tableConfig = getTableConfig(tableNameWithType);
     SegmentPurger.RecordPurgerFactory recordPurgerFactory = MINION_CONTEXT.getRecordPurgerFactory();
     SegmentPurger.RecordPurger recordPurger =
         recordPurgerFactory != null ? recordPurgerFactory.getRecordPurger(rawTableName) : null;
@@ -52,8 +52,11 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
     SegmentPurger.RecordModifier recordModifier =
         recordModifierFactory != null ? recordModifierFactory.getRecordModifier(rawTableName) : null;
 
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
     _eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " + indexDir);
-    SegmentPurger segmentPurger = new SegmentPurger(indexDir, workingDir, tableConfig, recordPurger, recordModifier);
+    SegmentPurger segmentPurger =
+        new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier);
     File purgedSegmentFile = segmentPurger.purgeSegment();
     if (purgedSegmentFile == null) {
       purgedSegmentFile = indexDir;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
index d8f955be8a..29a2cdd25a 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -95,6 +96,8 @@ public class PurgeTaskExecutorTest {
     ZkHelixPropertyStore<ZNRecord> helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
     Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, AccessOption.PERSISTENT))
         .thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+    Mockito.when(helixPropertyStore.get("/SCHEMAS/testTable", null, AccessOption.PERSISTENT))
+        .thenReturn(SchemaUtils.toZNRecord(schema));
     minionContext.setHelixPropertyStore(helixPropertyStore);
     minionContext.setRecordPurgerFactory(rawTableName -> {
       if (rawTableName.equals(TABLE_NAME)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org