You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/02 19:41:30 UTC

[incubator-pinot] branch master updated: Adding a new Server API for computing average off heap memory consumed (#6172)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b7819e1  Adding a new Server API for computing average off heap memory consumed (#6172)
b7819e1 is described below

commit b7819e1718c9ef292d622df981ef5d1392004611
Author: icefury71 <ch...@gmail.com>
AuthorDate: Mon Nov 2 11:41:18 2020 -0800

    Adding a new Server API for computing average off heap memory consumed (#6172)
    
    Adding a new API for getting average off heap memory consumed by consuming segments
    of a given real-time table. This number is useful for the caller to get an accurate idea of memory
    requirements (and thus subsequent provisioning).
---
 .../realtime/impl/RealtimeSegmentStatsHistory.java | 30 +++++++--------
 .../impl/RealtimeSegmentStatsHistoryTest.java      | 24 ++++++++++++
 .../server/api/resources/MmapDebugResource.java    | 43 ++++++++++++++++++++++
 .../server/api/resources/TableSizeResource.java    |  4 +-
 .../pinot/server/api/resources/TablesResource.java |  8 ++--
 5 files changed, 88 insertions(+), 21 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
index 3869f4d..db750ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
@@ -240,7 +240,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
   }
 
   public synchronized boolean isEmpty() {
-    return getNumntriesToScan() == 0;
+    return getNumEntriesToScan() == 0;
   }
 
   public synchronized void setMinIntervalBetweenUpdatesMillis(long millis) {
@@ -272,7 +272,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
    * @return estimated
    */
   public synchronized int getEstimatedCardinality(@Nonnull String columnName) {
-    int numEntriesToScan = getNumntriesToScan();
+    int numEntriesToScan = getNumEntriesToScan();
     if (numEntriesToScan == 0) {
       return DEFAULT_EST_CARDINALITY;
     }
@@ -303,7 +303,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
    * @return estimated average string size
    */
   public synchronized int getEstimatedAvgColSize(@Nonnull String columnName) {
-    int numEntriesToScan = getNumntriesToScan();
+    int numEntriesToScan = getNumEntriesToScan();
     if (numEntriesToScan == 0) {
       return DEFAULT_EST_AVG_COL_SIZE;
     }
@@ -327,7 +327,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
   }
 
   public synchronized int getEstimatedRowsToIndex() {
-    int numEntriesToScan = getNumntriesToScan();
+    int numEntriesToScan = getNumEntriesToScan();
     if (numEntriesToScan == 0) {
       return DEFAULT_ROWS_TO_INDEX;
     }
@@ -341,6 +341,16 @@ public class RealtimeSegmentStatsHistory implements Serializable {
     return (numRowsIndexed > 0) ? (int) (numRowsIndexed / numEntriesToScan) : DEFAULT_ROWS_TO_INDEX;
   }
 
+  public synchronized long getLatestSegmentMemoryConsumed() {
+    if (isEmpty()) {
+      return -1;
+    }
+    // Get the last updated index
+    int latestSegmentIndex = (_cursor + _arraySize - 1) % _arraySize;
+    SegmentStats stats = getSegmentStatsAt(latestSegmentIndex);
+    return stats._memUsedBytes;
+  }
+
   public SegmentStats getSegmentStatsAt(int index) {
     return _entries[index];
   }
@@ -374,20 +384,10 @@ public class RealtimeSegmentStatsHistory implements Serializable {
     }
   }
 
-  private int getNumntriesToScan() {
+  private int getNumEntriesToScan() {
     if (isFull()) {
       return getArraySize();
     }
     return getCursor();
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    RealtimeSegmentStatsHistory history = RealtimeSegmentStatsHistory.deserialzeFrom(new File("/tmp/stats.ser"));
-    System.out.println(history.toString());
-    for (int i = 0; i < history.getNumntriesToScan(); i++) {
-      SegmentStats segmentStats = history.getSegmentStatsAt(i);
-      System.out.println(segmentStats.toString());
-    }
-  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
index eeb0db7..a3dcb56 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.realtime.impl;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.util.TestUtils;
@@ -250,6 +251,29 @@ public class RealtimeSegmentStatsHistoryTest {
     Assert.assertEquals(segmentStats.getNumSeconds(), 700);
   }
 
+  @Test
+  public void testLatestConsumedMemory()
+      throws IOException, ClassNotFoundException {
+    final String tmpDir = System.getProperty("java.io.tmpdir");
+    File serializedFile = new File(tmpDir, STATS_FILE_NAME);
+    serializedFile.deleteOnExit();
+    FileUtils.deleteQuietly(serializedFile);
+    long[] memoryValues = {100, 100, 200, 400, 450, 600};
+
+    RealtimeSegmentStatsHistory history = RealtimeSegmentStatsHistory.deserialzeFrom(serializedFile);
+    Assert.assertEquals(history.getLatestSegmentMemoryConsumed(), -1);
+    RealtimeSegmentStatsHistory.SegmentStats segmentStats = null;
+
+    for (int i = 0; i < memoryValues.length; i++) {
+      segmentStats = new RealtimeSegmentStatsHistory.SegmentStats();
+      segmentStats.setMemUsedBytes(memoryValues[i]);
+      history.addSegmentStats(segmentStats);
+    }
+
+    long expectedMemUsed = memoryValues[memoryValues.length - 1];
+    Assert.assertEquals(history.getLatestSegmentMemoryConsumed(), expectedMemUsed);
+  }
+
   private static class StatsUpdater implements Runnable {
     private final RealtimeSegmentStatsHistory _statsHistory;
     private final int _numIterations;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
index aafcb8b..72df278 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
@@ -20,14 +20,26 @@ package org.apache.pinot.server.api.resources;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.Collections;
 import java.util.List;
+import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 /**
@@ -37,6 +49,9 @@ import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 @Path("debug")
 public class MmapDebugResource {
 
+  @Inject
+  private ServerInstance _serverInstance;
+
   @GET
   @Path("memory/offheap")
   @ApiOperation(value = "View current off-heap allocations", notes = "Lists all off-heap allocations and their associated sizes")
@@ -45,4 +60,32 @@ public class MmapDebugResource {
   public List<String> getOffHeapSizes() {
     return PinotDataBuffer.getBufferInfo();
   }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/memory/offheap/table/{tableName}")
+  @ApiOperation(value = "Show off heap memory consumed by latest mutable segment", notes = "Returns off heap memory consumed by latest consuming segment of realtime table")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Table not found")})
+  public String getTableSize(
+      @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName)
+      throws WebApplicationException {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != TableType.REALTIME) {
+      throw new WebApplicationException("This api cannot be used with non real-time table: " + tableName,
+          Response.Status.BAD_REQUEST);
+    }
+
+    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
+    if (instanceDataManager == null) {
+      throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    RealtimeTableDataManager realtimeTableDataManager =
+        (RealtimeTableDataManager) instanceDataManager.getTableDataManager(tableName);
+    if (realtimeTableDataManager == null) {
+      throw new WebApplicationException("Table: " + tableName + " is not found", Response.Status.NOT_FOUND);
+    }
+
+    long memoryConsumed = realtimeTableDataManager.getStatsHistory().getLatestSegmentMemoryConsumed();
+    return ResourceUtils.convertToJsonString(Collections.singletonMap("offheapMemoryConsumed", memoryConsumed));
+  }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
index 5828a74..007ce5d 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
@@ -53,7 +53,7 @@ import org.apache.pinot.server.starter.ServerInstance;
 public class TableSizeResource {
 
   @Inject
-  ServerInstance serverInstance;
+  private ServerInstance _serverInstance;
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -64,7 +64,7 @@ public class TableSizeResource {
       @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "Provide detailed information") @DefaultValue("true") @QueryParam("detailed") boolean detailed)
       throws WebApplicationException {
-    InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
 
     if (instanceDataManager == null) {
       throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index d670da7..65d1a5f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -67,7 +67,7 @@ public class TablesResource {
   private static final String PEER_SEGMENT_DOWNLOAD_DIR = "peerSegmentDownloadDir";
 
   @Inject
-  ServerInstance serverInstance;
+  private ServerInstance _serverInstance;
 
   @Inject
   private AccessControlFactory _accessControlFactory;
@@ -85,10 +85,10 @@ public class TablesResource {
   }
 
   private InstanceDataManager checkGetInstanceDataManager() {
-    if (serverInstance == null) {
+    if (_serverInstance == null) {
       throw new WebApplicationException("Server initialization error. Missing server instance");
     }
-    InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
     if (instanceDataManager == null) {
       throw new WebApplicationException("Server initialization error. Missing data manager",
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -218,7 +218,7 @@ public class TablesResource {
       // Note that two clients asking the same segment file will result in the same tar.gz files being created twice.
       // Will revisit for optimization if performance becomes an issue.
       File tmpSegmentTarDir =
-          new File(serverInstance.getInstanceDataManager().getSegmentFileDirectory(), PEER_SEGMENT_DOWNLOAD_DIR);
+          new File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(), PEER_SEGMENT_DOWNLOAD_DIR);
       tmpSegmentTarDir.mkdir();
 
       File segmentTarFile = new File(tmpSegmentTarDir, tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org