You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/17 06:02:44 UTC
[pinot] branch master updated: [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2fb03d85 [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)
8b2fb03d85 is described below
commit 8b2fb03d8532afd092c0211990e61f156fb45b73
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jun 16 23:02:39 2023 -0700
[Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)
---
.../controller/helix/PinotResourceManagerTest.java | 231 ++++++++++-----------
.../controller/utils/SegmentMetadataMockUtils.java | 4 +-
2 files changed, 111 insertions(+), 124 deletions(-)
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 445e43a871..6f4c87de04 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -23,10 +23,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
@@ -36,65 +38,64 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
public class PinotResourceManagerTest {
- private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
- private static final String OFFLINE_TABLE_NAME = "offlineResourceManagerTestTable_OFFLINE";
- private static final String REALTIME_TABLE_NAME = "realtimeResourceManagerTestTable_REALTIME";
- private static final String NUM_REPLICAS_STRING = "2";
- private static final String PARTITION_COLUMN = "Partition_Column";
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final int NUM_REPLICAS = 2;
+ private static final String PARTITION_COLUMN = "partitionColumn";
+
+ private final ControllerTest _testInstance = ControllerTest.getInstance();
+ private PinotHelixResourceManager _resourceManager;
@BeforeClass
public void setUp()
throws Exception {
- TEST_INSTANCE.setupSharedStateAndValidate();
+ _testInstance.setupSharedStateAndValidate();
+ _resourceManager = _testInstance.getHelixResourceManager();
// Adding an offline table
- TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build();
- TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig);
+ TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ _resourceManager.addTable(offlineTableConfig);
// Adding an upsert enabled realtime table which consumes from a stream with 2 partitions
- Schema dummySchema = TEST_INSTANCE.createDummySchema(REALTIME_TABLE_NAME);
- TEST_INSTANCE.addSchema(dummySchema);
+ Schema dummySchema = ControllerTest.createDummySchema(RAW_TABLE_NAME);
+ _testInstance.addSchema(dummySchema);
Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
TableConfig realtimeTableConfig =
- new TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(REALTIME_TABLE_NAME)
- .setSchemaName(dummySchema.getSchemaName()).build();
- realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
- realtimeTableConfig.getValidationConfig()
- .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL));
- TEST_INSTANCE.getHelixResourceManager().addTable(realtimeTableConfig);
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+ .setStreamConfigs(streamConfigs)
+ .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+ _resourceManager.addTable(realtimeTableConfig);
}
@Test
public void testTableCleanupAfterRealtimeClusterException()
throws Exception {
- String invalidRealtimeTable = "invalidTable_REALTIME";
- Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable);
- TEST_INSTANCE.addSchema(dummySchema);
+ String invalidRawTableName = "invalidTable";
+ Schema dummySchema = ControllerTest.createDummySchema(invalidRawTableName);
+ _testInstance.addSchema(dummySchema);
- // Missing replicasPerPartition
+ // Missing stream config
TableConfig invalidRealtimeTableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable)
- .setSchemaName(dummySchema.getSchemaName()).build();
-
+ new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRawTableName).build();
try {
- TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig);
- Assert.fail(
- "Table creation should have thrown exception due to missing stream config and replicasPerPartition in "
- + "validation config");
+ _resourceManager.addTable(invalidRealtimeTableConfig);
+ fail("Table creation should have thrown exception due to missing stream config in validation config");
} catch (Exception e) {
// expected
}
// Verify invalid table config is cleaned up
- Assert.assertNull(TEST_INSTANCE.getHelixResourceManager().getTableConfig(invalidRealtimeTable));
+ assertNull(_resourceManager.getTableConfig(invalidRealtimeTableConfig.getTableName()));
}
@Test
@@ -102,144 +103,130 @@ public class PinotResourceManagerTest {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata("testSegment");
// Segment ZK metadata does not exist
- Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager()
- .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
+ assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
// Set segment ZK metadata
- Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager()
- .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata));
+ assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata));
// Update ZK metadata
- Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
- .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 0);
- Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager()
- .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
- Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
- .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 1);
- Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager()
- .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
+ ZNRecord segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment");
+ assertNotNull(segmentMetadataZnRecord);
+ assertEquals(segmentMetadataZnRecord.getVersion(), 0);
+ assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
+ segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment");
+ assertNotNull(segmentMetadataZnRecord);
+ assertEquals(segmentMetadataZnRecord.getVersion(), 1);
+ assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
}
/**
* First tests basic segment adding/deleting.
- * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have
- * 100 segments in the end. Then launches 5 threads again that concurrently try to delete all segments,
- * and makes sure that we have zero segments left in the end.
- * @throws Exception
+ * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have 30 segments in the
+ * end. Then launches 3 threads again that concurrently try to delete all segments, and makes sure that we have 0
+ * segments left in the end.
*/
-
@Test
public void testBasicAndConcurrentAddingAndDeletingSegments()
throws Exception {
- final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME);
+ PinotHelixResourceManager resourceManager = _resourceManager;
- // Basic add/delete case
+ // Basic add/delete
for (int i = 1; i <= 2; i++) {
- TEST_INSTANCE.getHelixResourceManager()
- .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
- "downloadUrl");
+ resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl");
}
- IdealState idealState = TEST_INSTANCE.getHelixAdmin()
- .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
+ IdealState idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
Set<String> segments = idealState.getPartitionSet();
- Assert.assertEquals(segments.size(), 2);
+ assertEquals(segments.size(), 2);
for (String segmentName : segments) {
- TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName);
+ resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName);
}
- idealState = TEST_INSTANCE.getHelixAdmin()
- .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
- Assert.assertEquals(idealState.getPartitionSet().size(), 0);
+ idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
+ assertEquals(idealState.getNumPartitions(), 0);
- // Concurrent segment deletion
- ExecutorService addSegmentExecutor = Executors.newFixedThreadPool(3);
+ // Concurrent add/deletion
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ Future<?>[] futures = new Future[3];
for (int i = 0; i < 3; i++) {
- addSegmentExecutor.execute(new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- TEST_INSTANCE.getHelixResourceManager()
- .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
- "downloadUrl");
- }
+ futures[i] = executor.submit(() -> {
+ for (int i1 = 0; i1 < 10; i1++) {
+ resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl");
}
});
}
- addSegmentExecutor.shutdown();
- addSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
-
- idealState = TEST_INSTANCE.getHelixAdmin()
- .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
- Assert.assertEquals(idealState.getPartitionSet().size(), 30);
-
- ExecutorService deleteSegmentExecutor = Executors.newFixedThreadPool(3);
- for (final String segmentName : idealState.getPartitionSet()) {
- deleteSegmentExecutor.execute(new Runnable() {
- @Override
- public void run() {
- TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName);
- }
- });
+ for (int i = 0; i < 3; i++) {
+ futures[i].get();
+ }
+ idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
+ segments = idealState.getPartitionSet();
+ assertEquals(segments.size(), 30);
+
+ futures = new Future[30];
+ int index = 0;
+ for (String segment : segments) {
+ futures[index++] = executor.submit(() -> resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segment));
+ }
+ for (int i = 0; i < 30; i++) {
+ futures[i].get();
}
- deleteSegmentExecutor.shutdown();
- deleteSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
+ idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
+ assertEquals(idealState.getNumPartitions(), 0);
- idealState = TEST_INSTANCE.getHelixAdmin()
- .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
- Assert.assertEquals(idealState.getPartitionSet().size(), 0);
+ executor.shutdown();
}
@Test
public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() {
// Add three segments: two from partition 0 and 1 from partition 1;
- String partition0Segment0 = "realtimeResourceManagerTestTable__aa";
- String partition0Segment1 = "realtimeResourceManagerTestTable__bb";
- String partition1Segment1 = "realtimeResourceManagerTestTable__cc";
- TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment0, PARTITION_COLUMN, 0),
- "downloadUrl");
- TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment1, PARTITION_COLUMN, 0),
- "downloadUrl");
- TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition1Segment1, PARTITION_COLUMN, 1),
- "downloadUrl");
- Map<String, Integer> segment2PartitionId = new HashMap<>();
- segment2PartitionId.put(partition0Segment0, 0);
- segment2PartitionId.put(partition0Segment1, 0);
- segment2PartitionId.put(partition1Segment1, 1);
-
- IdealState idealState = TEST_INSTANCE.getHelixAdmin()
- .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(),
- TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME));
+ String partition0Segment0 = "p0s0";
+ String partition0Segment1 = "p0s1";
+ String partition1Segment0 = "p1s0";
+ _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment0,
+ PARTITION_COLUMN, 0), "downloadUrl");
+ _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment1,
+ PARTITION_COLUMN, 0), "downloadUrl");
+ _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition1Segment0,
+ PARTITION_COLUMN, 1), "downloadUrl");
+
+ IdealState idealState = _resourceManager.getTableIdealState(REALTIME_TABLE_NAME);
+ assertNotNull(idealState);
Set<String> segments = idealState.getPartitionSet();
- Assert.assertEquals(segments.size(), 5);
- Assert.assertTrue(segments.contains(partition0Segment0));
- Assert.assertTrue(segments.contains(partition0Segment1));
- Assert.assertTrue(segments.contains(partition1Segment1));
+ // 2 consuming segments, 3 uploaded segments
+ assertEquals(segments.size(), 5);
+ assertTrue(segments.contains(partition0Segment0));
+ assertTrue(segments.contains(partition0Segment1));
+ assertTrue(segments.contains(partition1Segment0));
// Check the segments of the same partition is assigned to the same set of servers.
- Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
+ Map<Integer, Set<String>> partitionIdToServersMap = new HashMap<>();
for (String segment : segments) {
- Integer partitionId;
+ int partitionId;
LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
if (llcSegmentName != null) {
partitionId = llcSegmentName.getPartitionGroupId();
} else {
- partitionId = segment2PartitionId.get(segment);
+ partitionId = Integer.parseInt(segment.substring(1, 2));
}
- Assert.assertNotNull(partitionId);
Set<String> instances = idealState.getInstanceSet(segment);
- if (segmentAssignment.containsKey(partitionId)) {
- Assert.assertEquals(instances, segmentAssignment.get(partitionId));
+ if (partitionIdToServersMap.containsKey(partitionId)) {
+ assertEquals(instances, partitionIdToServersMap.get(partitionId));
} else {
- segmentAssignment.put(partitionId, instances);
+ partitionIdToServersMap.put(partitionId, instances);
}
}
}
@AfterClass
public void tearDown() {
- TEST_INSTANCE.cleanup();
+ _testInstance.cleanup();
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 4a47d56512..f411be0daa 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -71,7 +71,7 @@ public class SegmentMetadataMockUtils {
return segmentZKMetadata;
}
- public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String tableName, String segmentName,
+ public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String rawTableName, String segmentName,
String columnName, int partitionNumber) {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
Set<Integer> partitions = Collections.singleton(partitionNumber);
@@ -82,7 +82,7 @@ public class SegmentMetadataMockUtils {
if (columnName != null) {
when(segmentMetadata.getColumnMetadataFor(columnName)).thenReturn(columnMetadata);
}
- when(segmentMetadata.getTableName()).thenReturn(tableName);
+ when(segmentMetadata.getTableName()).thenReturn(rawTableName);
when(segmentMetadata.getName()).thenReturn(segmentName);
when(segmentMetadata.getCrc()).thenReturn("0");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org