You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2024/02/09 06:12:09 UTC
(pinot) branch master updated: Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 947b47e3f4 Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)
947b47e3f4 is described below
commit 947b47e3f49bb7434dbb6f47b6c538fa91c61084
Author: 9aman <35...@users.noreply.github.com>
AuthorDate: Fri Feb 9 11:42:03 2024 +0530
Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)
---
.../common/restlet/resources/TableMetadataInfo.java | 10 +++++++++-
.../controller/util/ServerSegmentMetadataReader.java | 11 ++++++++++-
.../data/manager/realtime/RealtimeTableDataManager.java | 12 ++++++++++++
.../tests/models/DummyTableUpsertMetadataManager.java | 6 ++++++
.../upsert/ConcurrentMapTableUpsertMetadataManager.java | 10 ++++++++++
.../local/upsert/TableUpsertMetadataManager.java | 8 ++++++++
.../pinot/server/api/resources/TablesResource.java | 17 ++++++++++++++++-
7 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
index 27e28ab376..4a6953ac2c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
@@ -43,6 +43,7 @@ public class TableMetadataInfo {
private final Map<String, Double> _columnCardinalityMap;
private final Map<String, Double> _maxNumMultiValuesMap;
private final Map<String, Map<String, Double>> _columnIndexSizeMap;
+ private final Map<Integer, Map<String, Long>> _upsertPartitionToServerPrimaryKeyCountMap;
@JsonCreator
public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@@ -50,7 +51,9 @@ public class TableMetadataInfo {
@JsonProperty("numRows") long numRows, @JsonProperty("columnLengthMap") Map<String, Double> columnLengthMap,
@JsonProperty("columnCardinalityMap") Map<String, Double> columnCardinalityMap,
@JsonProperty("maxNumMultiValuesMap") Map<String, Double> maxNumMultiValuesMap,
- @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap) {
+ @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap,
+ @JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
+ Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap) {
_tableName = tableName;
_diskSizeInBytes = sizeInBytes;
_numSegments = numSegments;
@@ -59,6 +62,7 @@ public class TableMetadataInfo {
_columnCardinalityMap = columnCardinalityMap;
_maxNumMultiValuesMap = maxNumMultiValuesMap;
_columnIndexSizeMap = columnIndexSizeMap;
+ _upsertPartitionToServerPrimaryKeyCountMap = upsertPartitionToServerPrimaryKeyCountMap;
}
public String getTableName() {
@@ -92,4 +96,8 @@ public class TableMetadataInfo {
public Map<String, Map<String, Double>> getColumnIndexSizeMap() {
return _columnIndexSizeMap;
}
+
+ public Map<Integer, Map<String, Long>> getUpsertPartitionToServerPrimaryKeyCountMap() {
+ return _upsertPartitionToServerPrimaryKeyCountMap;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index f728d51635..cdcc7dc78c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -112,6 +112,7 @@ public class ServerSegmentMetadataReader {
final Map<String, Double> columnCardinalityMap = new HashMap<>();
final Map<String, Double> maxNumMultiValuesMap = new HashMap<>();
final Map<String, Map<String, Double>> columnIndexSizeMap = new HashMap<>();
+ final Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
TableMetadataInfo tableMetadataInfo =
@@ -128,6 +129,14 @@ public class ServerSegmentMetadataReader {
}
return l;
}));
+ tableMetadataInfo.getUpsertPartitionToServerPrimaryKeyCountMap().forEach(
+ (partition, serverToPrimaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.merge(partition,
+ new HashMap<>(serverToPrimaryKeyCount), (l, r) -> {
+ for (Map.Entry<String, Long> serverToPKCount : r.entrySet()) {
+ l.merge(serverToPKCount.getKey(), serverToPKCount.getValue(), Long::sum);
+ }
+ return l;
+ }));
} catch (IOException e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
@@ -151,7 +160,7 @@ public class ServerSegmentMetadataReader {
TableMetadataInfo aggregateTableMetadataInfo =
new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes, totalNumSegments, totalNumRows, columnLengthMap,
- columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap);
+ columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, upsertPartitionToServerPrimaryKeyCountMap);
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} aggregated segment metadata responses from servers.", failedParses,
serverUrls.size());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d974663b20..c3cb5c603a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -708,6 +708,18 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
return _tableUpsertMetadataManager;
}
+ /**
+ * Retrieves a mapping of partition id to the primary key count for the partition.
+ *
+ * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition.
+ */
+ public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() {
+ if (isUpsertEnabled()) {
+ return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+ }
+ return Collections.emptyMap();
+ }
+
/**
* Validate a schema against the table config for real-time record consumption.
* Ideally, we should validate these things when schema is added or table is created, but either of these
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index 502834a6b6..a549389697 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -72,6 +73,11 @@ public class DummyTableUpsertMetadataManager extends BaseTableUpsertMetadataMana
public void stop() {
}
+ @Override
+ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+ return Collections.emptyMap();
+ }
+
@Override
public void close()
throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index b0593f8d5f..6d48464b69 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
@@ -45,6 +46,15 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
}
}
+ @Override
+ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+ Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>();
+ _partitionMetadataManagerMap.forEach(
+ (partitionID, upsertMetadataManager) -> partitionToPrimaryKeyCount.put(partitionID,
+ upsertMetadataManager.getNumPrimaryKeys()));
+ return partitionToPrimaryKeyCount;
+ }
+
@Override
public void close()
throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 2ac107d790..3e98030d8a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import java.io.Closeable;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -47,5 +48,12 @@ public interface TableUpsertMetadataManager extends Closeable {
*/
void stop();
+ /**
+ * Retrieves a mapping of partition id to the primary key count for the partition.
+ *
+ * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition
+ */
+ Map<Integer, Long> getPartitionToPrimaryKeyCount();
+
boolean isPreloading();
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 5e458bbb28..aad233bc77 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -80,6 +80,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -286,9 +287,23 @@ public class TablesResource {
}
}
+ // fetch partition to primary key count for realtime tables that have upsert enabled
+ Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>();
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager;
+ upsertPartitionToPrimaryKeyCountMap = realtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount();
+ }
+
+ // construct upsertPartitionToServerPrimaryKeyCountMap to populate in TableMetadataInfo
+ Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
+ upsertPartitionToPrimaryKeyCountMap.forEach(
+ (partition, primaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.put(partition,
+ Map.of(instanceDataManager.getInstanceId(), primaryKeyCount)));
+
TableMetadataInfo tableMetadataInfo =
new TableMetadataInfo(tableDataManager.getTableName(), totalSegmentSizeBytes, segmentDataManagers.size(),
- totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap);
+ totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap,
+ upsertPartitionToServerPrimaryKeyCountMap);
return ResourceUtils.convertToJsonString(tableMetadataInfo);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org