You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2024/02/09 06:12:09 UTC

(pinot) branch master updated: Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 947b47e3f4 Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)
947b47e3f4 is described below

commit 947b47e3f49bb7434dbb6f47b6c538fa91c61084
Author: 9aman <35...@users.noreply.github.com>
AuthorDate: Fri Feb 9 11:42:03 2024 +0530

    Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334)
---
 .../common/restlet/resources/TableMetadataInfo.java     | 10 +++++++++-
 .../controller/util/ServerSegmentMetadataReader.java    | 11 ++++++++++-
 .../data/manager/realtime/RealtimeTableDataManager.java | 12 ++++++++++++
 .../tests/models/DummyTableUpsertMetadataManager.java   |  6 ++++++
 .../upsert/ConcurrentMapTableUpsertMetadataManager.java | 10 ++++++++++
 .../local/upsert/TableUpsertMetadataManager.java        |  8 ++++++++
 .../pinot/server/api/resources/TablesResource.java      | 17 ++++++++++++++++-
 7 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
index 27e28ab376..4a6953ac2c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
@@ -43,6 +43,7 @@ public class TableMetadataInfo {
   private final Map<String, Double> _columnCardinalityMap;
   private final Map<String, Double> _maxNumMultiValuesMap;
   private final Map<String, Map<String, Double>> _columnIndexSizeMap;
+  private final Map<Integer, Map<String, Long>> _upsertPartitionToServerPrimaryKeyCountMap;
 
   @JsonCreator
   public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@@ -50,7 +51,9 @@ public class TableMetadataInfo {
       @JsonProperty("numRows") long numRows, @JsonProperty("columnLengthMap") Map<String, Double> columnLengthMap,
       @JsonProperty("columnCardinalityMap") Map<String, Double> columnCardinalityMap,
       @JsonProperty("maxNumMultiValuesMap") Map<String, Double> maxNumMultiValuesMap,
-      @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap) {
+      @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap,
+      @JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
+      Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap) {
     _tableName = tableName;
     _diskSizeInBytes = sizeInBytes;
     _numSegments = numSegments;
@@ -59,6 +62,7 @@ public class TableMetadataInfo {
     _columnCardinalityMap = columnCardinalityMap;
     _maxNumMultiValuesMap = maxNumMultiValuesMap;
     _columnIndexSizeMap = columnIndexSizeMap;
+    _upsertPartitionToServerPrimaryKeyCountMap = upsertPartitionToServerPrimaryKeyCountMap;
   }
 
   public String getTableName() {
@@ -92,4 +96,8 @@ public class TableMetadataInfo {
   public Map<String, Map<String, Double>> getColumnIndexSizeMap() {
     return _columnIndexSizeMap;
   }
+
+  public Map<Integer, Map<String, Long>> getUpsertPartitionToServerPrimaryKeyCountMap() {
+    return _upsertPartitionToServerPrimaryKeyCountMap;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index f728d51635..cdcc7dc78c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -112,6 +112,7 @@ public class ServerSegmentMetadataReader {
     final Map<String, Double> columnCardinalityMap = new HashMap<>();
     final Map<String, Double> maxNumMultiValuesMap = new HashMap<>();
     final Map<String, Map<String, Double>> columnIndexSizeMap = new HashMap<>();
+    final Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
     for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
       try {
         TableMetadataInfo tableMetadataInfo =
@@ -128,6 +129,14 @@ public class ServerSegmentMetadataReader {
           }
           return l;
         }));
+        tableMetadataInfo.getUpsertPartitionToServerPrimaryKeyCountMap().forEach(
+            (partition, serverToPrimaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.merge(partition,
+                new HashMap<>(serverToPrimaryKeyCount), (l, r) -> {
+                  for (Map.Entry<String, Long> serverToPKCount : r.entrySet()) {
+                    l.merge(serverToPKCount.getKey(), serverToPKCount.getValue(), Long::sum);
+                  }
+                  return l;
+                }));
       } catch (IOException e) {
         failedParses++;
         LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
@@ -151,7 +160,7 @@ public class ServerSegmentMetadataReader {
 
     TableMetadataInfo aggregateTableMetadataInfo =
         new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes, totalNumSegments, totalNumRows, columnLengthMap,
-            columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap);
+            columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, upsertPartitionToServerPrimaryKeyCountMap);
     if (failedParses != 0) {
       LOGGER.warn("Failed to parse {} / {} aggregated segment metadata responses from servers.", failedParses,
           serverUrls.size());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d974663b20..c3cb5c603a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -708,6 +708,18 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     return _tableUpsertMetadataManager;
   }
 
+  /**
+   * Retrieves a mapping of partition id to the primary key count for the partition.
+   *
+   * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition.
+   */
+  public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() {
+    if (isUpsertEnabled()) {
+      return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+    }
+    return Collections.emptyMap();
+  }
+
   /**
    * Validate a schema against the table config for real-time record consumption.
    * Ideally, we should validate these things when schema is added or table is created, but either of these
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index 502834a6b6..a549389697 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -72,6 +73,11 @@ public class DummyTableUpsertMetadataManager extends BaseTableUpsertMetadataMana
   public void stop() {
   }
 
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    return Collections.emptyMap();
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index b0593f8d5f..6d48464b69 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
@@ -45,6 +46,15 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
     }
   }
 
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>();
+    _partitionMetadataManagerMap.forEach(
+        (partitionID, upsertMetadataManager) -> partitionToPrimaryKeyCount.put(partitionID,
+            upsertMetadataManager.getNumPrimaryKeys()));
+    return partitionToPrimaryKeyCount;
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 2ac107d790..3e98030d8a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.io.Closeable;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -47,5 +48,12 @@ public interface TableUpsertMetadataManager extends Closeable {
    */
   void stop();
 
+  /**
+   * Retrieves a mapping of partition id to the primary key count for the partition.
+   *
+   * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition
+   */
+  Map<Integer, Long> getPartitionToPrimaryKeyCount();
+
   boolean isPreloading();
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 5e458bbb28..aad233bc77 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -80,6 +80,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -286,9 +287,23 @@ public class TablesResource {
       }
     }
 
+    // fetch partition to primary key count for realtime tables that have upsert enabled
+    Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>();
+    if (tableDataManager instanceof RealtimeTableDataManager) {
+      RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager;
+      upsertPartitionToPrimaryKeyCountMap = realtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount();
+    }
+
+    // construct upsertPartitionToServerPrimaryKeyCountMap to populate in TableMetadataInfo
+    Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
+    upsertPartitionToPrimaryKeyCountMap.forEach(
+        (partition, primaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.put(partition,
+            Map.of(instanceDataManager.getInstanceId(), primaryKeyCount)));
+
     TableMetadataInfo tableMetadataInfo =
         new TableMetadataInfo(tableDataManager.getTableName(), totalSegmentSizeBytes, segmentDataManagers.size(),
-            totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap);
+            totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap,
+            upsertPartitionToServerPrimaryKeyCountMap);
     return ResourceUtils.convertToJsonString(tableMetadataInfo);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org