You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/07/28 17:29:07 UTC
[pinot] branch master updated: Make Preload Integration test more extensible (#11195)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b69f438a79 Make Preload Integration test more extensible (#11195)
b69f438a79 is described below
commit b69f438a79716179c3a7c8161a67d675bdd9909e
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Jul 28 22:59:01 2023 +0530
Make Preload Integration test more extensible (#11195)
* Make Preload Integration test more extensible
* Make snapshot method protected as well
---------
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../UpsertTableSegmentPreloadIntegrationTest.java | 38 ++++++++++++++--------
1 file changed, 24 insertions(+), 14 deletions(-)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index e662403347..0bd5a84af6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -73,12 +73,17 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
startBroker();
startServers(NUM_SERVERS);
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
// Start Kafka and push data into Kafka
startKafka();
+ populateTables();
+ }
+
+ protected void populateTables()
+ throws Exception {
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
@@ -192,6 +197,16 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
assertEquals(getCurrentCountStarResult(), getCountStarResult());
assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
+ waitForSnapshotCreation();
+
+ // Restart the servers and check again
+ restartServers();
+ verifyIdealState(7);
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ protected void waitForSnapshotCreation()
+ throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
// trigger force commit for snapshots
String jobId = forceCommit(getTableName());
@@ -211,7 +226,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR);
File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles();
for (File file : files) {
- if (file.getName().contains("tmp") || file.getName().contains("consumer")) {
+ if (!file.getName().startsWith(getTableName())) {
continue;
}
if (file.isDirectory()) {
@@ -231,15 +246,10 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
} catch (Exception e) {
return false;
}
- }, 60000L, "Error verifying force commit operation on table!");
-
- // Restart the servers and check again
- restartServers();
- verifyIdealState(7);
- waitForAllDocsLoaded(600_000L);
+ }, 120000L, "Error verifying force commit operation on table!");
}
- private void verifyIdealState(int numSegmentsExpected) {
+ protected void verifyIdealState(int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
@@ -295,7 +305,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
}
}
- public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
+ protected Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType);
Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
@@ -308,7 +318,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
return matchingSegments;
}
- public boolean isForceCommitJobCompleted(String forceCommitJobId)
+ protected boolean isForceCommitJobCompleted(String forceCommitJobId)
throws Exception {
String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
@@ -318,7 +328,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra
return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}
- private String forceCommit(String tableName)
+ protected String forceCommit(String tableName)
throws Exception {
String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null);
return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org