You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/17 06:02:44 UTC

[pinot] branch master updated: [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2fb03d85 [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)
8b2fb03d85 is described below

commit 8b2fb03d8532afd092c0211990e61f156fb45b73
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jun 16 23:02:39 2023 -0700

    [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941)
---
 .../controller/helix/PinotResourceManagerTest.java | 231 ++++++++++-----------
 .../controller/utils/SegmentMetadataMockUtils.java |   4 +-
 2 files changed, 111 insertions(+), 124 deletions(-)

diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 445e43a871..6f4c87de04 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -23,10 +23,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
@@ -36,65 +38,64 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
+
 
 public class PinotResourceManagerTest {
-  private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
-  private static final String OFFLINE_TABLE_NAME = "offlineResourceManagerTestTable_OFFLINE";
-  private static final String REALTIME_TABLE_NAME = "realtimeResourceManagerTestTable_REALTIME";
-  private static final String NUM_REPLICAS_STRING = "2";
-  private static final String PARTITION_COLUMN = "Partition_Column";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final int NUM_REPLICAS = 2;
+  private static final String PARTITION_COLUMN = "partitionColumn";
+
+  private final ControllerTest _testInstance = ControllerTest.getInstance();
+  private PinotHelixResourceManager _resourceManager;
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TEST_INSTANCE.setupSharedStateAndValidate();
+    _testInstance.setupSharedStateAndValidate();
+    _resourceManager = _testInstance.getHelixResourceManager();
 
     // Adding an offline table
-    TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build();
-    TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig);
+    TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    _resourceManager.addTable(offlineTableConfig);
 
     // Adding an upsert enabled realtime table which consumes from a stream with 2 partitions
-    Schema dummySchema = TEST_INSTANCE.createDummySchema(REALTIME_TABLE_NAME);
-    TEST_INSTANCE.addSchema(dummySchema);
+    Schema dummySchema = ControllerTest.createDummySchema(RAW_TABLE_NAME);
+    _testInstance.addSchema(dummySchema);
     Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     TableConfig realtimeTableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(REALTIME_TABLE_NAME)
-            .setSchemaName(dummySchema.getSchemaName()).build();
-    realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
-    realtimeTableConfig.getValidationConfig()
-        .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL));
-    TEST_INSTANCE.getHelixResourceManager().addTable(realtimeTableConfig);
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+            .setStreamConfigs(streamConfigs)
+            .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1))
+            .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+    _resourceManager.addTable(realtimeTableConfig);
   }
 
   @Test
   public void testTableCleanupAfterRealtimeClusterException()
       throws Exception {
-    String invalidRealtimeTable = "invalidTable_REALTIME";
-    Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable);
-    TEST_INSTANCE.addSchema(dummySchema);
+    String invalidRawTableName = "invalidTable";
+    Schema dummySchema = ControllerTest.createDummySchema(invalidRawTableName);
+    _testInstance.addSchema(dummySchema);
 
-    // Missing replicasPerPartition
+    // Missing stream config
     TableConfig invalidRealtimeTableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable)
-            .setSchemaName(dummySchema.getSchemaName()).build();
-
+        new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRawTableName).build();
     try {
-      TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig);
-      Assert.fail(
-          "Table creation should have thrown exception due to missing stream config and replicasPerPartition in "
-              + "validation config");
+      _resourceManager.addTable(invalidRealtimeTableConfig);
+      fail("Table creation should have thrown exception due to missing stream config in validation config");
     } catch (Exception e) {
       // expected
     }
 
     // Verify invalid table config is cleaned up
-    Assert.assertNull(TEST_INSTANCE.getHelixResourceManager().getTableConfig(invalidRealtimeTable));
+    assertNull(_resourceManager.getTableConfig(invalidRealtimeTableConfig.getTableName()));
   }
 
   @Test
@@ -102,144 +103,130 @@ public class PinotResourceManagerTest {
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata("testSegment");
 
     // Segment ZK metadata does not exist
-    Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager()
-        .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
+    assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
 
     // Set segment ZK metadata
-    Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager()
-        .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata));
+    assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata));
 
     // Update ZK metadata
-    Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
-        .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 0);
-    Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager()
-        .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
-    Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
-        .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 1);
-    Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager()
-        .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0));
+    ZNRecord segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment");
+    assertNotNull(segmentMetadataZnRecord);
+    assertEquals(segmentMetadataZnRecord.getVersion(), 0);
+    assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
+    segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment");
+    assertNotNull(segmentMetadataZnRecord);
+    assertEquals(segmentMetadataZnRecord.getVersion(), 1);
+    assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0));
   }
 
   /**
    * First tests basic segment adding/deleting.
-   * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have
-   * 100 segments in the end. Then launches 5 threads again that concurrently try to delete all segments,
-   * and makes sure that we have zero segments left in the end.
-   * @throws Exception
+   * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have 30 segments in the
+   * end. Then launches 3 threads again that concurrently try to delete all segments, and makes sure that we have 0
+   * segments left in the end.
    */
-
   @Test
   public void testBasicAndConcurrentAddingAndDeletingSegments()
       throws Exception {
-    final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME);
+    PinotHelixResourceManager resourceManager = _resourceManager;
 
-    // Basic add/delete case
+    // Basic add/delete
     for (int i = 1; i <= 2; i++) {
-      TEST_INSTANCE.getHelixResourceManager()
-          .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
-              "downloadUrl");
+      resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl");
     }
-    IdealState idealState = TEST_INSTANCE.getHelixAdmin()
-        .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
+    IdealState idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
     Set<String> segments = idealState.getPartitionSet();
-    Assert.assertEquals(segments.size(), 2);
+    assertEquals(segments.size(), 2);
 
     for (String segmentName : segments) {
-      TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName);
+      resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName);
     }
-    idealState = TEST_INSTANCE.getHelixAdmin()
-        .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
-    Assert.assertEquals(idealState.getPartitionSet().size(), 0);
+    idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
+    assertEquals(idealState.getNumPartitions(), 0);
 
-    // Concurrent segment deletion
-    ExecutorService addSegmentExecutor = Executors.newFixedThreadPool(3);
+    // Concurrent add/deletion
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    Future<?>[] futures = new Future[3];
     for (int i = 0; i < 3; i++) {
-      addSegmentExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          for (int i = 0; i < 10; i++) {
-            TEST_INSTANCE.getHelixResourceManager()
-                .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
-                    "downloadUrl");
-          }
+      futures[i] = executor.submit(() -> {
+        for (int i1 = 0; i1 < 10; i1++) {
+          resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+              SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl");
         }
       });
     }
-    addSegmentExecutor.shutdown();
-    addSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
-
-    idealState = TEST_INSTANCE.getHelixAdmin()
-        .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
-    Assert.assertEquals(idealState.getPartitionSet().size(), 30);
-
-    ExecutorService deleteSegmentExecutor = Executors.newFixedThreadPool(3);
-    for (final String segmentName : idealState.getPartitionSet()) {
-      deleteSegmentExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName);
-        }
-      });
+    for (int i = 0; i < 3; i++) {
+      futures[i].get();
+    }
+    idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
+    segments = idealState.getPartitionSet();
+    assertEquals(segments.size(), 30);
+
+    futures = new Future[30];
+    int index = 0;
+    for (String segment : segments) {
+      futures[index++] = executor.submit(() -> resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segment));
+    }
+    for (int i = 0; i < 30; i++) {
+      futures[i].get();
     }
-    deleteSegmentExecutor.shutdown();
-    deleteSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
+    idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
+    assertEquals(idealState.getNumPartitions(), 0);
 
-    idealState = TEST_INSTANCE.getHelixAdmin()
-        .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName);
-    Assert.assertEquals(idealState.getPartitionSet().size(), 0);
+    executor.shutdown();
   }
 
   @Test
   public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() {
     // Add three segments: two from partition 0 and 1 from partition 1;
-    String partition0Segment0 = "realtimeResourceManagerTestTable__aa";
-    String partition0Segment1 = "realtimeResourceManagerTestTable__bb";
-    String partition1Segment1 = "realtimeResourceManagerTestTable__cc";
-    TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment0, PARTITION_COLUMN, 0),
-        "downloadUrl");
-    TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment1, PARTITION_COLUMN, 0),
-        "downloadUrl");
-    TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition1Segment1, PARTITION_COLUMN, 1),
-        "downloadUrl");
-    Map<String, Integer> segment2PartitionId = new HashMap<>();
-    segment2PartitionId.put(partition0Segment0, 0);
-    segment2PartitionId.put(partition0Segment1, 0);
-    segment2PartitionId.put(partition1Segment1, 1);
-
-    IdealState idealState = TEST_INSTANCE.getHelixAdmin()
-        .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(),
-            TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME));
+    String partition0Segment0 = "p0s0";
+    String partition0Segment1 = "p0s1";
+    String partition1Segment0 = "p1s0";
+    _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment0,
+            PARTITION_COLUMN, 0), "downloadUrl");
+    _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment1,
+            PARTITION_COLUMN, 0), "downloadUrl");
+    _resourceManager.addNewSegment(REALTIME_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition1Segment0,
+            PARTITION_COLUMN, 1), "downloadUrl");
+
+    IdealState idealState = _resourceManager.getTableIdealState(REALTIME_TABLE_NAME);
+    assertNotNull(idealState);
     Set<String> segments = idealState.getPartitionSet();
-    Assert.assertEquals(segments.size(), 5);
-    Assert.assertTrue(segments.contains(partition0Segment0));
-    Assert.assertTrue(segments.contains(partition0Segment1));
-    Assert.assertTrue(segments.contains(partition1Segment1));
+    // 2 consuming segments, 3 uploaded segments
+    assertEquals(segments.size(), 5);
+    assertTrue(segments.contains(partition0Segment0));
+    assertTrue(segments.contains(partition0Segment1));
+    assertTrue(segments.contains(partition1Segment0));
 
     // Check the segments of the same partition is assigned to the same set of servers.
-    Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
+    Map<Integer, Set<String>> partitionIdToServersMap = new HashMap<>();
     for (String segment : segments) {
-      Integer partitionId;
+      int partitionId;
       LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
       if (llcSegmentName != null) {
         partitionId = llcSegmentName.getPartitionGroupId();
       } else {
-        partitionId = segment2PartitionId.get(segment);
+        partitionId = Integer.parseInt(segment.substring(1, 2));
       }
-      Assert.assertNotNull(partitionId);
       Set<String> instances = idealState.getInstanceSet(segment);
-      if (segmentAssignment.containsKey(partitionId)) {
-        Assert.assertEquals(instances, segmentAssignment.get(partitionId));
+      if (partitionIdToServersMap.containsKey(partitionId)) {
+        assertEquals(instances, partitionIdToServersMap.get(partitionId));
       } else {
-        segmentAssignment.put(partitionId, instances);
+        partitionIdToServersMap.put(partitionId, instances);
       }
     }
   }
 
   @AfterClass
   public void tearDown() {
-    TEST_INSTANCE.cleanup();
+    _testInstance.cleanup();
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 4a47d56512..f411be0daa 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -71,7 +71,7 @@ public class SegmentMetadataMockUtils {
     return segmentZKMetadata;
   }
 
-  public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String tableName, String segmentName,
+  public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String rawTableName, String segmentName,
       String columnName, int partitionNumber) {
     ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
     Set<Integer> partitions = Collections.singleton(partitionNumber);
@@ -82,7 +82,7 @@ public class SegmentMetadataMockUtils {
     if (columnName != null) {
       when(segmentMetadata.getColumnMetadataFor(columnName)).thenReturn(columnMetadata);
     }
-    when(segmentMetadata.getTableName()).thenReturn(tableName);
+    when(segmentMetadata.getTableName()).thenReturn(rawTableName);
     when(segmentMetadata.getName()).thenReturn(segmentName);
     when(segmentMetadata.getCrc()).thenReturn("0");
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org