You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/04 11:35:50 UTC

[pinot] branch master updated: Fix allSegmentLoaded check (#9010)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 792ff66eaf Fix allSegmentLoaded check (#9010)
792ff66eaf is described below

commit 792ff66eaf74357d00e98b921c6fbcb242cde807
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Mon Jul 4 17:05:43 2022 +0530

    Fix allSegmentLoaded check (#9010)
    
    * Fix allSegmentLoaded check
    
    * Comments + dedup ITs
    
    * Review comments
    
    Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
 .../tests/BaseClusterIntegrationTest.java          |  21 +++
 .../integration/tests/DedupIntegrationTest.java    | 153 +++++++++++++++++++++
 .../src/test/resources/dedupIngestionTestData.avro | Bin 0 -> 294 bytes
 .../test/resources/dedupIngestionTestData.tar.gz   | Bin 0 -> 321 bytes
 .../test/resources/dedupIngestionTestSchema.schema |  23 ++++
 .../local/utils/tablestate/TableStateUtils.java    |  69 ++++++----
 6 files changed, 241 insertions(+), 25 deletions(-)

diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index f8fe1a42d6..8308435a47 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -38,7 +38,9 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -398,6 +400,25 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
   }
 
+  /**
+   * Creates a new Dedup enabled table config
+   */
+  protected TableConfig createDedupTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) {
+    AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+    columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
+
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+        .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+        .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        .setLLC(useLlc()).setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
+        .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+        .setDedupConfig(new DedupConfig(true, HashFunction.NONE)).build();
+  }
+
   /**
    * Returns the REALTIME table config in the cluster.
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java
new file mode 100644
index 0000000000..d133c5c02b
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DedupIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private List<File> _avroFiles;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    // Start a customized controller with more frequent realtime segment validation
+    startController();
+    startBroker();
+    startServers(1);
+
+    _avroFiles = unpackAvroData(_tempDir);
+    startKafka();
+    pushAvroIntoKafka(_avroFiles);
+
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createDedupTableConfig(_avroFiles.get(0), "id", getNumKafkaPartitions());
+    addTableConfig(tableConfig);
+
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected int getRealtimeSegmentFlushSize() {
+    // Create > 1 segments
+    return 2;
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "dedupIngestionTestSchema.schema";
+  }
+
+  @Override
+  protected String getSchemaName() {
+    return "dedupSchema";
+  }
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "dedupIngestionTestData.tar.gz";
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected String getPartitionColumn() {
+    return "id";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // Three distinct records are expected with pk values of 100000, 100001, 100002
+    return 5;
+  }
+
+  @Test
+  public void testValues()
+      throws Exception {
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+
+    // Validate the older value persist
+    for (int i = 0; i < getCountStarResult(); i++) {
+      Assert.assertEquals(getPinotConnection()
+          .execute("SELECT name FROM " + getTableName() + " WHERE id = " + i)
+          .getResultSet(0)
+          .getString(0),
+          "" + i);
+    }
+  }
+
+  @Test
+  public void testSegmentReload()
+      throws Exception {
+    ControllerTest.sendPostRequest(
+        StringUtil.join("/", getControllerBaseApiUrl(), "segments", getTableName(),
+            "reload?forceDownload=false"), null);
+
+    // wait for reload to finish
+    Thread.sleep(1000);
+
+    // Push data again
+    pushAvroIntoKafka(_avroFiles);
+
+    // Validate no change
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+    for (int i = 0; i < getCountStarResult(); i++) {
+      Assert.assertEquals(getPinotConnection()
+              .execute("SELECT name FROM " + getTableName() + " WHERE id = " + i)
+              .getResultSet(0)
+              .getString(0),
+          "" + i);
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro
new file mode 100644
index 0000000000..daf26bd4ad
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro differ
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz
new file mode 100644
index 0000000000..c868f292b0
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema b/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema
new file mode 100644
index 0000000000..993c42eec9
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema
@@ -0,0 +1,23 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "singleValueField": true,
+      "name": "id"
+    },
+    {
+      "dataType": "STRING",
+      "singleValueField": true,
+      "name": "name"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "timeType": "DAYS",
+      "dataType": "INT",
+      "name": "DaysSinceEpoch"
+    }
+  },
+  "primaryKeyColumns": ["id"],
+  "schemaName": "dedupSchema"
+}
\ No newline at end of file
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 2392a0bcd1..4e74682220 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.segment.local.utils.tablestate;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -36,6 +38,14 @@ public class TableStateUtils {
   private TableStateUtils() {
   }
 
+  /**
+   * Checks if all segments for the given @param tableNameWithType are succesfully loaded
+   * This function will get all segments in IDEALSTATE and CURRENTSTATE for the given table,
+   * and then check if all ONLINE segments in IDEALSTATE match with CURRENTSTATE.
+   * @param helixManager helix manager for the server instance
+   * @param tableNameWithType table name for which segment state is to be checked
+   * @return true if all segments for the given table are succesfully loaded. False otherwise
+   */
   public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) {
     HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
@@ -44,24 +54,11 @@ public class TableStateUtils {
       LOGGER.warn("Failed to find ideal state for table: {}", tableNameWithType);
       return false;
     }
-    String instanceName = helixManager.getInstanceName();
-    LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
-    if (liveInstance == null) {
-      LOGGER.warn("Failed to find live instance for instance: {}", instanceName);
-      return false;
-    }
-    String sessionId = liveInstance.getEphemeralOwner();
-    CurrentState currentState =
-        dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType));
-    if (currentState == null) {
-      LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId,
-          tableNameWithType);
-      return false;
-    }
 
-    // Check if ideal state and current state matches for all segments assigned to the current instance
+    // Get all ONLINE segments from idealState
+    String instanceName = helixManager.getInstanceName();
+    List<String> onlineSegments = new ArrayList<>();
     Map<String, Map<String, String>> idealStatesMap = idealState.getRecord().getMapFields();
-    Map<String, String> currentStateMap = currentState.getPartitionStateMap();
     for (Map.Entry<String, Map<String, String>> entry : idealStatesMap.entrySet()) {
       String segmentName = entry.getKey();
       Map<String, String> instanceStateMap = entry.getValue();
@@ -70,17 +67,39 @@ public class TableStateUtils {
       if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(expectedState)) {
         continue;
       }
-      String actualState = currentStateMap.get(segmentName);
-      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
-        if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
-          LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
-              expectedState);
-        } else {
-          LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, tableNameWithType,
-              expectedState, actualState);
-        }
+      onlineSegments.add(segmentName);
+    }
+
+    if (onlineSegments.size() > 0) {
+      LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+      if (liveInstance == null) {
+        LOGGER.warn("Failed to find live instance for instance: {}", instanceName);
+        return false;
+      }
+      String sessionId = liveInstance.getEphemeralOwner();
+      CurrentState currentState =
+          dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType));
+      if (currentState == null) {
+        LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId,
+            tableNameWithType);
         return false;
       }
+      // Check if ideal state and current state matches for all segments assigned to the current instance
+      Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+
+      for (String segmentName : onlineSegments) {
+        String actualState = currentStateMap.get(segmentName);
+        if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
+          if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
+            LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
+                CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+          } else {
+            LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName,
+                tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE, actualState);
+          }
+          return false;
+        }
+      }
     }
 
     LOGGER.info("All segments loaded for table: {}", tableNameWithType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org