You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/07/28 17:29:07 UTC

[pinot] branch master updated: Make Preload Integration test more extensible (#11195)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b69f438a79 Make Preload Integration test more extensible (#11195)
b69f438a79 is described below

commit b69f438a79716179c3a7c8161a67d675bdd9909e
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Jul 28 22:59:01 2023 +0530

    Make Preload Integration test more extensible (#11195)
    
    * Make Preload Integration test more extensible
    
    * Make snapshot method protected as well
    
    ---------
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../UpsertTableSegmentPreloadIntegrationTest.java  | 38 ++++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)

diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index e662403347..0bd5a84af6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -73,12 +73,17 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
     startBroker();
     startServers(NUM_SERVERS);
 
-    // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
     // Start Kafka and push data into Kafka
     startKafka();
 
+    populateTables();
+  }
+
+  protected void populateTables()
+      throws Exception {
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
     // Create and upload schema and table config
     Schema schema = createSchema();
     addSchema(schema);
@@ -192,6 +197,16 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
     assertEquals(getCurrentCountStarResult(), getCountStarResult());
     assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
 
+    waitForSnapshotCreation();
+
+    // Restart the servers and check again
+    restartServers();
+    verifyIdealState(7);
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  protected void waitForSnapshotCreation()
+      throws Exception {
     Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
     // trigger force commit for snapshots
     String jobId = forceCommit(getTableName());
@@ -211,7 +226,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
                 serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR);
             File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles();
             for (File file : files) {
-              if (file.getName().contains("tmp") || file.getName().contains("consumer")) {
+              if (!file.getName().startsWith(getTableName())) {
                 continue;
               }
               if (file.isDirectory()) {
@@ -231,15 +246,10 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
       } catch (Exception e) {
         return false;
       }
-    }, 60000L, "Error verifying force commit operation on table!");
-
-    // Restart the servers and check again
-    restartServers();
-    verifyIdealState(7);
-    waitForAllDocsLoaded(600_000L);
+    }, 120000L, "Error verifying force commit operation on table!");
   }
 
-  private void verifyIdealState(int numSegmentsExpected) {
+  protected void verifyIdealState(int numSegmentsExpected) {
     IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME);
     Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
     assertEquals(segmentAssignment.size(), numSegmentsExpected);
@@ -295,7 +305,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
     }
   }
 
-  public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
+  protected Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
     IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType);
     Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
     Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
@@ -308,7 +318,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
     return matchingSegments;
   }
 
-  public boolean isForceCommitJobCompleted(String forceCommitJobId)
+  protected boolean isForceCommitJobCompleted(String forceCommitJobId)
       throws Exception {
     String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId));
     JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
@@ -318,7 +328,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
     return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
   }
 
-  private String forceCommit(String tableName)
+  protected String forceCommit(String tableName)
       throws Exception {
     String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null);
     return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org