You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/07/26 10:52:14 UTC
[pinot] branch master updated: Add Integration test for Upsert Preload (#11160)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d9c64f347a Add Integration test for Upsert Preload (#11160)
d9c64f347a is described below
commit d9c64f347af0d1c6e27c8b0c304033be9f463f69
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 26 16:22:09 2023 +0530
Add Integration test for Upsert Preload (#11160)
* Add Integration test for Upsert Preload
* Fix tests
* Reverting Base method changes as it breaks compatibility tests
---
... UpsertTableSegmentPreloadIntegrationTest.java} | 123 +++++++++++++++++++--
.../UpsertTableSegmentUploadIntegrationTest.java | 3 +-
2 files changed, 115 insertions(+), 11 deletions(-)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
similarity index 60%
copy from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
copy to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index 7fddd5e43c..e662403347 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -18,18 +18,26 @@
*/
package org.apache.pinot.integration.tests;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -38,10 +46,11 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
-public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
- private static final int NUM_SERVERS = 2;
+public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final int NUM_SERVERS = 1;
private static final String PRIMARY_KEY_COL = "clientId";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
@@ -69,22 +78,31 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
// Start Kafka and push data into Kafka
startKafka();
- pushAvroIntoKafka(avroFiles);
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
- TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+ TableConfig tableConfig =
+ createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+ tableConfig.getUpsertConfig().setEnablePreload(true);
+ tableConfig.getUpsertConfig().setEnableSnapshot(true);
addTableConfig(tableConfig);
// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
+ pushAvroIntoKafka(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads",
+ "1");
+ }
+
@AfterClass
public void tearDown()
throws IOException {
@@ -166,27 +184,78 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
@Test
public void testSegmentAssignment()
throws Exception {
- verifyIdealState();
+ verifyIdealState(5);
// Run the real-time segment validation and check again
_controllerStarter.getRealtimeSegmentValidationManager().run();
- verifyIdealState();
+ verifyIdealState(5);
assertEquals(getCurrentCountStarResult(), getCountStarResult());
assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
+ Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
+ // trigger force commit for snapshots
+ String jobId = forceCommit(getTableName());
+
+ Set<String> finalConsumingSegments = consumingSegments;
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ if (isForceCommitJobCompleted(jobId)) {
+ assertTrue(_controllerStarter.getHelixResourceManager()
+ .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false)
+ .containsAll(finalConsumingSegments));
+
+ int snapshotFileCount = 0;
+ for (BaseServerStarter serverStarter : _serverStarters) {
+ String segmentDir =
+ serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR);
+ File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles();
+ for (File file : files) {
+ if (file.getName().contains("tmp") || file.getName().contains("consumer")) {
+ continue;
+ }
+ if (file.isDirectory()) {
+ File segmentV3Dir = new File(file, "v3");
+ File[] segmentFiles = segmentV3Dir.listFiles();
+ for (File segmentFile : segmentFiles) {
+ if (segmentFile.getName().endsWith(".snapshot")) {
+ snapshotFileCount++;
+ }
+ }
+ }
+ }
+ }
+ return snapshotFileCount == 5;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60000L, "Error verifying force commit operation on table!");
+
// Restart the servers and check again
restartServers();
- verifyIdealState();
+ verifyIdealState(7);
waitForAllDocsLoaded(600_000L);
}
- private void verifyIdealState() {
+ private void verifyIdealState(int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
- assertEquals(segmentAssignment.size(), 5);
+ assertEquals(segmentAssignment.size(), numSegmentsExpected);
String serverForPartition0 = null;
String serverForPartition1 = null;
+
+ int maxSequenceNumber = 0;
+ for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ maxSequenceNumber = Math.max(maxSequenceNumber, llcSegmentName.getSequenceNumber());
+ }
+ }
+
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
@@ -196,7 +265,12 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next();
String state = instanceIdAndState.getValue();
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- assertEquals(state, SegmentStateModel.CONSUMING);
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) {
+ assertEquals(state, SegmentStateModel.ONLINE);
+ } else {
+ assertEquals(state, SegmentStateModel.CONSUMING);
+ }
} else {
assertEquals(state, SegmentStateModel.ONLINE);
}
@@ -221,6 +295,35 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
}
}
+ public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
+ IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType);
+ Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
+ Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
+ for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+ matchingSegments.add(entry.getKey());
+ }
+ }
+ return matchingSegments;
+ }
+
+ public boolean isForceCommitJobCompleted(String forceCommitJobId)
+ throws Exception {
+ String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId));
+ JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+ assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
+ assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+ return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
+ }
+
+ private String forceCommit(String tableName)
+ throws Exception {
+ String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null);
+ return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+ }
+
private static int getSegmentPartitionId(String segmentName) {
switch (segmentName) {
case UPLOADED_SEGMENT_1:
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 7fddd5e43c..2b9f94ae10 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -74,7 +74,8 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
- TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+ TableConfig tableConfig =
+ createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
addTableConfig(tableConfig);
// Create and upload segments
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org