You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/06/08 04:57:29 UTC

[pinot] branch minion-hotfix-schema-evolution-2 created (now 68e04e3c58)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a change to branch minion-hotfix-schema-evolution-2
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at 68e04e3c58 Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)

This branch includes the following new commits:

     new 68e04e3c58 Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch minion-hotfix-schema-evolution-2
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 68e04e3c5899adc563ba81908e909f057eb39fc8
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Jun 7 20:55:29 2023 -0700

    Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)
---
 .../controller/helix/ControllerRequestClient.java  |  10 ++
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../apache/pinot/core/minion/SegmentPurger.java    |   9 +-
 .../pinot/core/minion/SegmentPurgerTest.java       |   7 +-
 .../tests/PurgeMinionClusterIntegrationTest.java   | 151 +++++++++++++++------
 .../minion/tasks/purge/PurgeTaskExecutor.java      |   7 +-
 .../minion/tasks/purge/PurgeTaskExecutorTest.java  |   3 +
 7 files changed, 140 insertions(+), 52 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 2da40ef3e5..e6aa83dea1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -81,6 +81,16 @@ public class ControllerRequestClient {
     }
   }
 
+  public void updateSchema(Schema schema)
+      throws IOException {
+    String url = _controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName());
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendMultipartPutRequest(url, schema.toSingleLineJsonString()));
+    } catch (HttpErrorStatusException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void deleteSchema(String schemaName)
       throws IOException {
     String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 41b34a4911..63f575293d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -622,6 +622,11 @@ public class ControllerTest {
     getControllerRequestClient().addSchema(schema);
   }
 
+  public void updateSchema(Schema schema)
+      throws IOException {
+    getControllerRequestClient().updateSchema(schema);
+  }
+
   public Schema getSchema(String schemaName) {
     Schema schema = _helixResourceManager.getSchema(schemaName);
     assertNotNull(schema);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 2b65508585..4094b6b6e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -45,19 +46,21 @@ public class SegmentPurger {
   private final File _indexDir;
   private final File _workingDir;
   private final TableConfig _tableConfig;
+  private final Schema _schema;
   private final RecordPurger _recordPurger;
   private final RecordModifier _recordModifier;
 
   private int _numRecordsPurged;
   private int _numRecordsModified;
 
-  public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, @Nullable RecordPurger recordPurger,
-      @Nullable RecordModifier recordModifier) {
+  public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Schema schema,
+      @Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier) {
     Preconditions.checkArgument(recordPurger != null || recordModifier != null,
         "At least one of record purger and modifier should be non-null");
     _indexDir = indexDir;
     _workingDir = workingDir;
     _tableConfig = tableConfig;
+    _schema = schema;
     _recordPurger = recordPurger;
     _recordModifier = recordModifier;
   }
@@ -79,7 +82,7 @@ public class SegmentPurger {
         return null;
       }
 
-      SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, segmentMetadata.getSchema());
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
       config.setOutDir(_workingDir.getPath());
       config.setSegmentName(segmentName);
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index d29c0d94e3..2dd6b37a3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -67,6 +67,7 @@ public class SegmentPurgerTest {
   private static final String D2 = "d2";
 
   private TableConfig _tableConfig;
+  private Schema _schema;
   private File _originalIndexDir;
   private int _expectedNumRecordsPurged;
   private int _expectedNumRecordsModified;
@@ -79,7 +80,7 @@ public class SegmentPurgerTest {
     _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true)
         .build();
-    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
+    _schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
         .addSingleValueDimension(D2, FieldSpec.DataType.INT).build();
 
     List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
@@ -98,7 +99,7 @@ public class SegmentPurgerTest {
     }
     GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows);
 
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
     config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
     config.setSegmentName(SEGMENT_NAME);
 
@@ -125,7 +126,7 @@ public class SegmentPurgerTest {
     };
 
     SegmentPurger segmentPurger =
-        new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier);
+        new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier);
     File purgedIndexDir = segmentPurger.purgeSegment();
 
     // Check the purge/modify counter in segment purger
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 284e7655b5..c4ba131f6d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,8 +36,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -56,25 +61,21 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
   private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
   private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
   private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
+  private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
+
 
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
   protected PinotTaskManager _taskManager;
   protected PinotHelixResourceManager _pinotHelixResourceManager;
   protected String _tableName;
 
-  protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
-  protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
-  protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
-
-  protected final File _tarDir1 = new File(_tempDir, "tarDir1");
-  protected final File _tarDir2 = new File(_tempDir, "tarDir2");
-  protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir");
+  protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1, _segmentDir2, _tarDir2, _segmentDir3,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, _segmentTarDir);
 
     // Start the Pinot cluster
     startZk();
@@ -82,37 +83,38 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     startBrokers(1);
     startServers(1);
 
-    // Create and upload the schema and table config
-    Schema schema = createSchema();
-    addSchema(schema);
-    setTableName(PURGE_DELTA_NOT_PASSED_TABLE);
-    TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig();
-    purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
-    setTableName(PURGE_FIRST_RUN_TABLE);
-    TableConfig purgeTableConfig = createOfflineTableConfig();
-    purgeTableConfig.setTaskConfig(getPurgeTaskConfig());
-
-    setTableName(PURGE_DELTA_PASSED_TABLE);
-    TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig();
-    purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
-
-    addTableConfig(purgeTableConfig);
-    addTableConfig(purgeDeltaPassedTableConfig);
-    addTableConfig(purgeDeltaNotPassedTableConfig);
+    List<String> allTables = ImmutableList.of(
+        PURGE_FIRST_RUN_TABLE,
+        PURGE_DELTA_PASSED_TABLE,
+        PURGE_DELTA_NOT_PASSED_TABLE,
+        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+    );
+    Schema schema = null;
+    TableConfig tableConfig = null;
+    for (String tableName : allTables) {
+      // create and upload schema
+      schema = createSchema();
+      schema.setSchemaName(tableName);
+      addSchema(schema);
+
+      // create and upload table config
+      setTableName(tableName);
+      tableConfig = createOfflineTableConfig();
+      tableConfig.setTaskConfig(getPurgeTaskConfig());
+      addTableConfig(tableConfig);
+    }
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, _segmentDir1, _tarDir1);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig, schema, 0, _segmentDir2,
-        _tarDir2);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig, schema, 0,
-        _segmentDir3, _tarDir3);
+    // Create segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDataDir,
+        _segmentTarDir);
 
-    uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1);
-    uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2);
-    uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3);
+    // Upload segments for all tables
+    for (String tableName : allTables) {
+      uploadSegments(tableName, _segmentTarDir);
+    }
 
     startMinion();
     setRecordPurger();
@@ -150,10 +152,14 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
   private void setRecordPurger() {
     MinionContext minionContext = MinionContext.getInstance();
     minionContext.setRecordPurgerFactory(rawTableName -> {
-      List<String> tableNames =
-          Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE);
+      List<String> tableNames = Arrays.asList(
+          PURGE_FIRST_RUN_TABLE,
+          PURGE_DELTA_PASSED_TABLE,
+          PURGE_DELTA_NOT_PASSED_TABLE,
+          PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+      );
       if (tableNames.contains(rawTableName)) {
-        return row -> row.getValue("Quarter").equals(1);
+        return row -> row.getValue("ArrTime").equals(1);
       } else {
         return null;
       }
@@ -205,11 +211,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
     assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
 
-    // 28057 rows with quarter = 1
+    // 52 rows with ArrTime = 1
     // 115545 totals rows
-    // Expecting 115545 - 28057 = 87488 rows after purging
+    // Expecting 115545 - 52 = 115493 rows after purging
     // It might take some time for server to load the purged segments
-    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 87488, 60_000L,
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 115493, 60_000L,
         "Failed to get expected purged records");
 
     // Drop the table
@@ -251,11 +257,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
     assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
 
-    // 28057 rows with quarter = 1
+    // 52 rows with ArrTime = 1
     // 115545 totals rows
-    // Expecting 115545 - 28057 = 87488 rows after purging
+    // Expecting 115545 - 52 = 115493 rows after purging
     // It might take some time for server to load the purged segments
-    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 87488, 60_000L,
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 115493, 60_000L,
         "Failed to get expected purged records");
 
     // Drop the table
@@ -302,6 +308,63 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
     verifyTableDelete(offlineTableName);
   }
 
+  /**
+   * Test purge on segments which were built by older schema and table config.
+   * Two new columns are added after segments are built and indices are defined for the new columns in the table config.
+   */
+  @Test
+  public void testPurgeOnOldSegmentsWithIndicesOnNewColumns()
+      throws Exception {
+
+    // add new columns to schema
+    Schema schema = createSchema();
+    schema.addField(new DimensionFieldSpec("ColumnABC", FieldSpec.DataType.INT, true));
+    schema.addField(new DimensionFieldSpec("ColumnXYZ", FieldSpec.DataType.INT, true));
+    schema.setSchemaName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    updateSchema(schema);
+
+    // add indices to the new columns
+    setTableName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    TableConfig tableConfig = createOfflineTableConfig();
+    tableConfig.setTaskConfig(getPurgeTaskConfig());
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    List<String> invertedIndices = new ArrayList<>(indexingConfig.getInvertedIndexColumns());
+    invertedIndices.add("ColumnABC");
+    List<String> rangeIndices = new ArrayList<>(indexingConfig.getRangeIndexColumns());
+    rangeIndices.add("ColumnXYZ");
+    indexingConfig.setInvertedIndexColumns(invertedIndices);
+    indexingConfig.setRangeIndexColumns(rangeIndices);
+    updateTableConfig(tableConfig);
+
+    // schedule purge tasks
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // Check that metadata contains expected values
+    for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Check purge time
+      assertTrue(
+          metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
+    }
+
+    // 52 rows with ArrTime = 1
+    // 115545 totals rows
+    // Expecting 115545 - 52 = 115493 rows after purging
+    // It might take some time for server to load the purged segments
+    TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE) == 115493,
+        60_000L, "Failed to get expected purged records");
+
+    // Drop the table
+    dropOfflineTable(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
   protected void verifyTableDelete(String tableNameWithType) {
     TestUtils.waitForCondition(input -> {
       // Check if the segment lineage is cleaned up
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
index 542bbde7da..267ed0874e 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.minion.SegmentPurger;
 import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -44,7 +45,6 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
 
-    TableConfig tableConfig = getTableConfig(tableNameWithType);
     SegmentPurger.RecordPurgerFactory recordPurgerFactory = MINION_CONTEXT.getRecordPurgerFactory();
     SegmentPurger.RecordPurger recordPurger =
         recordPurgerFactory != null ? recordPurgerFactory.getRecordPurger(rawTableName) : null;
@@ -52,8 +52,11 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
     SegmentPurger.RecordModifier recordModifier =
         recordModifierFactory != null ? recordModifierFactory.getRecordModifier(rawTableName) : null;
 
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
     _eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " + indexDir);
-    SegmentPurger segmentPurger = new SegmentPurger(indexDir, workingDir, tableConfig, recordPurger, recordModifier);
+    SegmentPurger segmentPurger =
+        new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier);
     File purgedSegmentFile = segmentPurger.purgeSegment();
     if (purgedSegmentFile == null) {
       purgedSegmentFile = indexDir;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
index d8f955be8a..29a2cdd25a 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -95,6 +96,8 @@ public class PurgeTaskExecutorTest {
     ZkHelixPropertyStore<ZNRecord> helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
     Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, AccessOption.PERSISTENT))
         .thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+    Mockito.when(helixPropertyStore.get("/SCHEMAS/testTable", null, AccessOption.PERSISTENT))
+        .thenReturn(SchemaUtils.toZNRecord(schema));
     minionContext.setHelixPropertyStore(helixPropertyStore);
     minionContext.setRecordPurgerFactory(rawTableName -> {
       if (rawTableName.equals(TABLE_NAME)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org