You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/04 11:35:50 UTC
[pinot] branch master updated: Fix allSegmentLoaded check (#9010)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 792ff66eaf Fix allSegmentLoaded check (#9010)
792ff66eaf is described below
commit 792ff66eaf74357d00e98b921c6fbcb242cde807
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Mon Jul 4 17:05:43 2022 +0530
Fix allSegmentLoaded check (#9010)
* Fix allSegmentLoaded check
* Comments + dedup ITs
* Review comments
Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
.../tests/BaseClusterIntegrationTest.java | 21 +++
.../integration/tests/DedupIntegrationTest.java | 153 +++++++++++++++++++++
.../src/test/resources/dedupIngestionTestData.avro | Bin 0 -> 294 bytes
.../test/resources/dedupIngestionTestData.tar.gz | Bin 0 -> 321 bytes
.../test/resources/dedupIngestionTestSchema.schema | 23 ++++
.../local/utils/tablestate/TableStateUtils.java | 69 ++++++----
6 files changed, 241 insertions(+), 25 deletions(-)
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index f8fe1a42d6..8308435a47 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -38,7 +38,9 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -398,6 +400,25 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
}
+ /**
+ * Creates a new Dedup enabled table config
+ */
+ protected TableConfig createDedupTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) {
+ AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+ columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
+
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
+ .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+ .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+ .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+ .setLLC(useLlc()).setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
+ .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+ .setDedupConfig(new DedupConfig(true, HashFunction.NONE)).build();
+ }
+
/**
* Returns the REALTIME table config in the cluster.
*/
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java
new file mode 100644
index 0000000000..d133c5c02b
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DedupIntegrationTest extends BaseClusterIntegrationTestSet {
+
+ private List<File> _avroFiles;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ // Start a customized controller with more frequent realtime segment validation
+ startController();
+ startBroker();
+ startServers(1);
+
+ _avroFiles = unpackAvroData(_tempDir);
+ startKafka();
+ pushAvroIntoKafka(_avroFiles);
+
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createDedupTableConfig(_avroFiles.get(0), "id", getNumKafkaPartitions());
+ addTableConfig(tableConfig);
+
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Override
+ protected int getRealtimeSegmentFlushSize() {
+ // Create > 1 segments
+ return 2;
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "dedupIngestionTestSchema.schema";
+ }
+
+ @Override
+ protected String getSchemaName() {
+ return "dedupSchema";
+ }
+
+ @Override
+ protected String getAvroTarFileName() {
+ return "dedupIngestionTestData.tar.gz";
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Override
+ protected String getPartitionColumn() {
+ return "id";
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // Three distinct records are expected with pk values of 100000, 100001, 100002
+ return 5;
+ }
+
+ @Test
+ public void testValues()
+ throws Exception {
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+
+ // Validate the older value persist
+ for (int i = 0; i < getCountStarResult(); i++) {
+ Assert.assertEquals(getPinotConnection()
+ .execute("SELECT name FROM " + getTableName() + " WHERE id = " + i)
+ .getResultSet(0)
+ .getString(0),
+ "" + i);
+ }
+ }
+
+ @Test
+ public void testSegmentReload()
+ throws Exception {
+ ControllerTest.sendPostRequest(
+ StringUtil.join("/", getControllerBaseApiUrl(), "segments", getTableName(),
+ "reload?forceDownload=false"), null);
+
+ // wait for reload to finish
+ Thread.sleep(1000);
+
+ // Push data again
+ pushAvroIntoKafka(_avroFiles);
+
+ // Validate no change
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ for (int i = 0; i < getCountStarResult(); i++) {
+ Assert.assertEquals(getPinotConnection()
+ .execute("SELECT name FROM " + getTableName() + " WHERE id = " + i)
+ .getResultSet(0)
+ .getString(0),
+ "" + i);
+ }
+ }
+}
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro
new file mode 100644
index 0000000000..daf26bd4ad
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.avro differ
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz
new file mode 100644
index 0000000000..c868f292b0
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/dedupIngestionTestData.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema b/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema
new file mode 100644
index 0000000000..993c42eec9
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dedupIngestionTestSchema.schema
@@ -0,0 +1,23 @@
+{
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "INT",
+ "singleValueField": true,
+ "name": "id"
+ },
+ {
+ "dataType": "STRING",
+ "singleValueField": true,
+ "name": "name"
+ }
+ ],
+ "timeFieldSpec": {
+ "incomingGranularitySpec": {
+ "timeType": "DAYS",
+ "dataType": "INT",
+ "name": "DaysSinceEpoch"
+ }
+ },
+ "primaryKeyColumns": ["id"],
+ "schemaName": "dedupSchema"
+}
\ No newline at end of file
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 2392a0bcd1..4e74682220 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.segment.local.utils.tablestate;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -36,6 +38,14 @@ public class TableStateUtils {
private TableStateUtils() {
}
+ /**
+ * Checks if all segments for the given @param tableNameWithType are succesfully loaded
+ * This function will get all segments in IDEALSTATE and CURRENTSTATE for the given table,
+ * and then check if all ONLINE segments in IDEALSTATE match with CURRENTSTATE.
+ * @param helixManager helix manager for the server instance
+ * @param tableNameWithType table name for which segment state is to be checked
+ * @return true if all segments for the given table are succesfully loaded. False otherwise
+ */
public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
@@ -44,24 +54,11 @@ public class TableStateUtils {
LOGGER.warn("Failed to find ideal state for table: {}", tableNameWithType);
return false;
}
- String instanceName = helixManager.getInstanceName();
- LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
- if (liveInstance == null) {
- LOGGER.warn("Failed to find live instance for instance: {}", instanceName);
- return false;
- }
- String sessionId = liveInstance.getEphemeralOwner();
- CurrentState currentState =
- dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType));
- if (currentState == null) {
- LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId,
- tableNameWithType);
- return false;
- }
- // Check if ideal state and current state matches for all segments assigned to the current instance
+ // Get all ONLINE segments from idealState
+ String instanceName = helixManager.getInstanceName();
+ List<String> onlineSegments = new ArrayList<>();
Map<String, Map<String, String>> idealStatesMap = idealState.getRecord().getMapFields();
- Map<String, String> currentStateMap = currentState.getPartitionStateMap();
for (Map.Entry<String, Map<String, String>> entry : idealStatesMap.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
@@ -70,17 +67,39 @@ public class TableStateUtils {
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(expectedState)) {
continue;
}
- String actualState = currentStateMap.get(segmentName);
- if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
- if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
- LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
- expectedState);
- } else {
- LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, tableNameWithType,
- expectedState, actualState);
- }
+ onlineSegments.add(segmentName);
+ }
+
+ if (onlineSegments.size() > 0) {
+ LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ LOGGER.warn("Failed to find live instance for instance: {}", instanceName);
+ return false;
+ }
+ String sessionId = liveInstance.getEphemeralOwner();
+ CurrentState currentState =
+ dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType));
+ if (currentState == null) {
+ LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId,
+ tableNameWithType);
return false;
}
+ // Check if ideal state and current state matches for all segments assigned to the current instance
+ Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+
+ for (String segmentName : onlineSegments) {
+ String actualState = currentStateMap.get(segmentName);
+ if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
+ if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
+ LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
+ CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+ } else {
+ LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName,
+ tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE, actualState);
+ }
+ return false;
+ }
+ }
}
LOGGER.info("All segments loaded for table: {}", tableNameWithType);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org