You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/06/08 04:57:30 UTC
[pinot] 01/01: Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch minion-hotfix-schema-evolution-2
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 68e04e3c5899adc563ba81908e909f057eb39fc8
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Jun 7 20:55:29 2023 -0700
Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869)
---
.../controller/helix/ControllerRequestClient.java | 10 ++
.../pinot/controller/helix/ControllerTest.java | 5 +
.../apache/pinot/core/minion/SegmentPurger.java | 9 +-
.../pinot/core/minion/SegmentPurgerTest.java | 7 +-
.../tests/PurgeMinionClusterIntegrationTest.java | 151 +++++++++++++++------
.../minion/tasks/purge/PurgeTaskExecutor.java | 7 +-
.../minion/tasks/purge/PurgeTaskExecutorTest.java | 3 +
7 files changed, 140 insertions(+), 52 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 2da40ef3e5..e6aa83dea1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -81,6 +81,16 @@ public class ControllerRequestClient {
}
}
+ public void updateSchema(Schema schema)
+ throws IOException {
+ String url = _controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName());
+ try {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendMultipartPutRequest(url, schema.toSingleLineJsonString()));
+ } catch (HttpErrorStatusException e) {
+ throw new IOException(e);
+ }
+ }
+
public void deleteSchema(String schemaName)
throws IOException {
String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 41b34a4911..63f575293d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -622,6 +622,11 @@ public class ControllerTest {
getControllerRequestClient().addSchema(schema);
}
+ public void updateSchema(Schema schema)
+ throws IOException {
+ getControllerRequestClient().updateSchema(schema);
+ }
+
public Schema getSchema(String schemaName) {
Schema schema = _helixResourceManager.getSchema(schemaName);
assertNotNull(schema);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 2b65508585..4094b6b6e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -45,19 +46,21 @@ public class SegmentPurger {
private final File _indexDir;
private final File _workingDir;
private final TableConfig _tableConfig;
+ private final Schema _schema;
private final RecordPurger _recordPurger;
private final RecordModifier _recordModifier;
private int _numRecordsPurged;
private int _numRecordsModified;
- public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, @Nullable RecordPurger recordPurger,
- @Nullable RecordModifier recordModifier) {
+ public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Schema schema,
+ @Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier) {
Preconditions.checkArgument(recordPurger != null || recordModifier != null,
"At least one of record purger and modifier should be non-null");
_indexDir = indexDir;
_workingDir = workingDir;
_tableConfig = tableConfig;
+ _schema = schema;
_recordPurger = recordPurger;
_recordModifier = recordModifier;
}
@@ -79,7 +82,7 @@ public class SegmentPurger {
return null;
}
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, segmentMetadata.getSchema());
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
config.setOutDir(_workingDir.getPath());
config.setSegmentName(segmentName);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index d29c0d94e3..2dd6b37a3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -67,6 +67,7 @@ public class SegmentPurgerTest {
private static final String D2 = "d2";
private TableConfig _tableConfig;
+ private Schema _schema;
private File _originalIndexDir;
private int _expectedNumRecordsPurged;
private int _expectedNumRecordsModified;
@@ -79,7 +80,7 @@ public class SegmentPurgerTest {
_tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true)
.build();
- Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
+ _schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
.addSingleValueDimension(D2, FieldSpec.DataType.INT).build();
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
@@ -98,7 +99,7 @@ public class SegmentPurgerTest {
}
GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows);
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema);
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
config.setSegmentName(SEGMENT_NAME);
@@ -125,7 +126,7 @@ public class SegmentPurgerTest {
};
SegmentPurger segmentPurger =
- new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier);
+ new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier);
File purgedIndexDir = segmentPurger.purgeSegment();
// Check the purge/modify counter in segment purger
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 284e7655b5..c4ba131f6d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.integration.tests;
+import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -34,8 +36,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
@@ -56,25 +61,21 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
+ private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
+
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotTaskManager _taskManager;
protected PinotHelixResourceManager _pinotHelixResourceManager;
protected String _tableName;
- protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
- protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
- protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
-
- protected final File _tarDir1 = new File(_tempDir, "tarDir1");
- protected final File _tarDir2 = new File(_tempDir, "tarDir2");
- protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+ protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir");
+ protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir");
@BeforeClass
public void setUp()
throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1, _segmentDir2, _tarDir2, _segmentDir3,
- _tarDir3);
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, _segmentTarDir);
// Start the Pinot cluster
startZk();
@@ -82,37 +83,38 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
startBrokers(1);
startServers(1);
- // Create and upload the schema and table config
- Schema schema = createSchema();
- addSchema(schema);
- setTableName(PURGE_DELTA_NOT_PASSED_TABLE);
- TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig();
- purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
- setTableName(PURGE_FIRST_RUN_TABLE);
- TableConfig purgeTableConfig = createOfflineTableConfig();
- purgeTableConfig.setTaskConfig(getPurgeTaskConfig());
-
- setTableName(PURGE_DELTA_PASSED_TABLE);
- TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig();
- purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
-
- addTableConfig(purgeTableConfig);
- addTableConfig(purgeDeltaPassedTableConfig);
- addTableConfig(purgeDeltaNotPassedTableConfig);
+ List<String> allTables = ImmutableList.of(
+ PURGE_FIRST_RUN_TABLE,
+ PURGE_DELTA_PASSED_TABLE,
+ PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+ );
+ Schema schema = null;
+ TableConfig tableConfig = null;
+ for (String tableName : allTables) {
+ // create and upload schema
+ schema = createSchema();
+ schema.setSchemaName(tableName);
+ addSchema(schema);
+
+ // create and upload table config
+ setTableName(tableName);
+ tableConfig = createOfflineTableConfig();
+ tableConfig.setTaskConfig(getPurgeTaskConfig());
+ addTableConfig(tableConfig);
+ }
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- // Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, _segmentDir1, _tarDir1);
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig, schema, 0, _segmentDir2,
- _tarDir2);
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig, schema, 0,
- _segmentDir3, _tarDir3);
+ // Create segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDataDir,
+ _segmentTarDir);
- uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1);
- uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2);
- uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3);
+ // Upload segments for all tables
+ for (String tableName : allTables) {
+ uploadSegments(tableName, _segmentTarDir);
+ }
startMinion();
setRecordPurger();
@@ -150,10 +152,14 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
private void setRecordPurger() {
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(rawTableName -> {
- List<String> tableNames =
- Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE);
+ List<String> tableNames = Arrays.asList(
+ PURGE_FIRST_RUN_TABLE,
+ PURGE_DELTA_PASSED_TABLE,
+ PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
+ );
if (tableNames.contains(rawTableName)) {
- return row -> row.getValue("Quarter").equals(1);
+ return row -> row.getValue("ArrTime").equals(1);
} else {
return null;
}
@@ -205,11 +211,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
- // 28057 rows with quarter = 1
+ // 52 rows with ArrTime = 1
// 115545 totals rows
- // Expecting 115545 - 28057 = 87488 rows after purging
+ // Expecting 115545 - 52 = 115493 rows after purging
// It might take some time for server to load the purged segments
- TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 87488, 60_000L,
+ TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 115493, 60_000L,
"Failed to get expected purged records");
// Drop the table
@@ -251,11 +257,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
- // 28057 rows with quarter = 1
+ // 52 rows with ArrTime = 1
// 115545 totals rows
- // Expecting 115545 - 28057 = 87488 rows after purging
+ // Expecting 115545 - 52 = 115493 rows after purging
// It might take some time for server to load the purged segments
- TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 87488, 60_000L,
+ TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 115493, 60_000L,
"Failed to get expected purged records");
// Drop the table
@@ -302,6 +308,63 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes
verifyTableDelete(offlineTableName);
}
+ /**
+ * Test purge on segments which were built by older schema and table config.
+ * Two new columns are added after segments are built and indices are defined for the new columns in the table config.
+ */
+ @Test
+ public void testPurgeOnOldSegmentsWithIndicesOnNewColumns()
+ throws Exception {
+
+ // add new columns to schema
+ Schema schema = createSchema();
+ schema.addField(new DimensionFieldSpec("ColumnABC", FieldSpec.DataType.INT, true));
+ schema.addField(new DimensionFieldSpec("ColumnXYZ", FieldSpec.DataType.INT, true));
+ schema.setSchemaName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+ updateSchema(schema);
+
+ // add indices to the new columns
+ setTableName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+ TableConfig tableConfig = createOfflineTableConfig();
+ tableConfig.setTaskConfig(getPurgeTaskConfig());
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ List<String> invertedIndices = new ArrayList<>(indexingConfig.getInvertedIndexColumns());
+ invertedIndices.add("ColumnABC");
+ List<String> rangeIndices = new ArrayList<>(indexingConfig.getRangeIndexColumns());
+ rangeIndices.add("ColumnXYZ");
+ indexingConfig.setInvertedIndexColumns(invertedIndices);
+ indexingConfig.setRangeIndexColumns(rangeIndices);
+ updateTableConfig(tableConfig);
+
+ // schedule purge tasks
+ String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+ assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertTrue(_helixTaskResourceManager.getTaskQueues()
+ .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+ assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ waitForTaskToComplete();
+
+ // Check that metadata contains expected values
+ for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+ // Check purge time
+ assertTrue(
+ metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
+ }
+
+ // 52 rows with ArrTime = 1
+ // 115545 totals rows
+ // Expecting 115545 - 52 = 115493 rows after purging
+ // It might take some time for server to load the purged segments
+ TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE) == 115493,
+ 60_000L, "Failed to get expected purged records");
+
+ // Drop the table
+ dropOfflineTable(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(offlineTableName);
+ }
+
protected void verifyTableDelete(String tableNameWithType) {
TestUtils.waitForCondition(input -> {
// Check if the segment lineage is cleaned up
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
index 542bbde7da..267ed0874e 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.minion.SegmentPurger;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -44,7 +45,6 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
- TableConfig tableConfig = getTableConfig(tableNameWithType);
SegmentPurger.RecordPurgerFactory recordPurgerFactory = MINION_CONTEXT.getRecordPurgerFactory();
SegmentPurger.RecordPurger recordPurger =
recordPurgerFactory != null ? recordPurgerFactory.getRecordPurger(rawTableName) : null;
@@ -52,8 +52,11 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
SegmentPurger.RecordModifier recordModifier =
recordModifierFactory != null ? recordModifierFactory.getRecordModifier(rawTableName) : null;
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
_eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " + indexDir);
- SegmentPurger segmentPurger = new SegmentPurger(indexDir, workingDir, tableConfig, recordPurger, recordModifier);
+ SegmentPurger segmentPurger =
+ new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier);
File purgedSegmentFile = segmentPurger.purgeSegment();
if (purgedSegmentFile == null) {
purgedSegmentFile = indexDir;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
index d8f955be8a..29a2cdd25a 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -95,6 +96,8 @@ public class PurgeTaskExecutorTest {
ZkHelixPropertyStore<ZNRecord> helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, AccessOption.PERSISTENT))
.thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+ Mockito.when(helixPropertyStore.get("/SCHEMAS/testTable", null, AccessOption.PERSISTENT))
+ .thenReturn(SchemaUtils.toZNRecord(schema));
minionContext.setHelixPropertyStore(helixPropertyStore);
minionContext.setRecordPurgerFactory(rawTableName -> {
if (rawTableName.equals(TABLE_NAME)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org