You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/07/26 10:52:14 UTC

[pinot] branch master updated: Add Integration test for Upsert Preload (#11160)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d9c64f347a Add Integration test for Upsert Preload (#11160)
d9c64f347a is described below

commit d9c64f347af0d1c6e27c8b0c304033be9f463f69
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 26 16:22:09 2023 +0530

    Add Integration test for Upsert Preload (#11160)
    
    * Add Integration test for Upsert Preload
    
    * Fix tests
    
    * Reverting Base method changes as it breaks compatibility tests
---
 ... UpsertTableSegmentPreloadIntegrationTest.java} | 123 +++++++++++++++++++--
 .../UpsertTableSegmentUploadIntegrationTest.java   |   3 +-
 2 files changed, 115 insertions(+), 11 deletions(-)

diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
similarity index 60%
copy from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
copy to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index 7fddd5e43c..e662403347 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -38,10 +46,11 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
-public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
-  private static final int NUM_SERVERS = 2;
+public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final int NUM_SERVERS = 1;
   private static final String PRIMARY_KEY_COL = "clientId";
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
 
@@ -69,22 +78,31 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
 
     // Start Kafka and push data into Kafka
     startKafka();
-    pushAvroIntoKafka(avroFiles);
 
     // Create and upload schema and table config
     Schema schema = createSchema();
     addSchema(schema);
-    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+    TableConfig tableConfig =
+        createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+    tableConfig.getUpsertConfig().setEnablePreload(true);
+    tableConfig.getUpsertConfig().setEnableSnapshot(true);
     addTableConfig(tableConfig);
 
     // Create and upload segments
     ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
     uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
 
+    pushAvroIntoKafka(avroFiles);
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
   }
 
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads",
+        "1");
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {
@@ -166,27 +184,78 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
   @Test
   public void testSegmentAssignment()
       throws Exception {
-    verifyIdealState();
+    verifyIdealState(5);
 
     // Run the real-time segment validation and check again
     _controllerStarter.getRealtimeSegmentValidationManager().run();
-    verifyIdealState();
+    verifyIdealState(5);
     assertEquals(getCurrentCountStarResult(), getCountStarResult());
     assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
 
+    Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
+    // trigger force commit for snapshots
+    String jobId = forceCommit(getTableName());
+
+    Set<String> finalConsumingSegments = consumingSegments;
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        if (isForceCommitJobCompleted(jobId)) {
+          assertTrue(_controllerStarter.getHelixResourceManager()
+              .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false)
+              .containsAll(finalConsumingSegments));
+
+          int snapshotFileCount = 0;
+          for (BaseServerStarter serverStarter : _serverStarters) {
+            String segmentDir =
+                serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR);
+            File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles();
+            for (File file : files) {
+              if (file.getName().contains("tmp") || file.getName().contains("consumer")) {
+                continue;
+              }
+              if (file.isDirectory()) {
+                File segmentV3Dir = new File(file, "v3");
+                File[] segmentFiles = segmentV3Dir.listFiles();
+                for (File segmentFile : segmentFiles) {
+                  if (segmentFile.getName().endsWith(".snapshot")) {
+                    snapshotFileCount++;
+                  }
+                }
+              }
+            }
+          }
+          return snapshotFileCount == 5;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60000L, "Error verifying force commit operation on table!");
+
     // Restart the servers and check again
     restartServers();
-    verifyIdealState();
+    verifyIdealState(7);
     waitForAllDocsLoaded(600_000L);
   }
 
-  private void verifyIdealState() {
+  private void verifyIdealState(int numSegmentsExpected) {
     IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME);
     Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
-    assertEquals(segmentAssignment.size(), 5);
+    assertEquals(segmentAssignment.size(), numSegmentsExpected);
 
     String serverForPartition0 = null;
     String serverForPartition1 = null;
+
+    int maxSequenceNumber = 0;
+    for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        maxSequenceNumber = Math.max(maxSequenceNumber, llcSegmentName.getSequenceNumber());
+      }
+    }
+
     for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
       String segmentName = entry.getKey();
       Map<String, String> instanceStateMap = entry.getValue();
@@ -196,7 +265,12 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
       Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next();
       String state = instanceIdAndState.getValue();
       if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-        assertEquals(state, SegmentStateModel.CONSUMING);
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) {
+          assertEquals(state, SegmentStateModel.ONLINE);
+        } else {
+          assertEquals(state, SegmentStateModel.CONSUMING);
+        }
       } else {
         assertEquals(state, SegmentStateModel.ONLINE);
       }
@@ -221,6 +295,35 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
     }
   }
 
+  public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
+    IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType);
+    Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
+    Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
+    for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+        matchingSegments.add(entry.getKey());
+      }
+    }
+    return matchingSegments;
+  }
+
+  public boolean isForceCommitJobCompleted(String forceCommitJobId)
+      throws Exception {
+    String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId));
+    JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+    assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
+    assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+    return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
+  }
+
+  private String forceCommit(String tableName)
+      throws Exception {
+    String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null);
+    return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+  }
+
   private static int getSegmentPartitionId(String segmentName) {
     switch (segmentName) {
       case UPLOADED_SEGMENT_1:
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 7fddd5e43c..2b9f94ae10 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -74,7 +74,8 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
     // Create and upload schema and table config
     Schema schema = createSchema();
     addSchema(schema);
-    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+    TableConfig tableConfig =
+        createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
     addTableConfig(tableConfig);
 
     // Create and upload segments


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org