You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/02 19:41:30 UTC
[incubator-pinot] branch master updated: Adding a new Server API
for computing average off heap memory consumed (#6172)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b7819e1 Adding a new Server API for computing average off heap memory consumed (#6172)
b7819e1 is described below
commit b7819e1718c9ef292d622df981ef5d1392004611
Author: icefury71 <ch...@gmail.com>
AuthorDate: Mon Nov 2 11:41:18 2020 -0800
Adding a new Server API for computing average off heap memory consumed (#6172)
Adding a new API for getting average off heap memory consumed by consuming segments
of a given real-time table. This number is useful for the caller to get an accurate idea of memory
requirements (and thus subsequent provisioning).
---
.../realtime/impl/RealtimeSegmentStatsHistory.java | 30 +++++++--------
.../impl/RealtimeSegmentStatsHistoryTest.java | 24 ++++++++++++
.../server/api/resources/MmapDebugResource.java | 43 ++++++++++++++++++++++
.../server/api/resources/TableSizeResource.java | 4 +-
.../pinot/server/api/resources/TablesResource.java | 8 ++--
5 files changed, 88 insertions(+), 21 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
index 3869f4d..db750ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistory.java
@@ -240,7 +240,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
}
public synchronized boolean isEmpty() {
- return getNumntriesToScan() == 0;
+ return getNumEntriesToScan() == 0;
}
public synchronized void setMinIntervalBetweenUpdatesMillis(long millis) {
@@ -272,7 +272,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
* @return estimated
*/
public synchronized int getEstimatedCardinality(@Nonnull String columnName) {
- int numEntriesToScan = getNumntriesToScan();
+ int numEntriesToScan = getNumEntriesToScan();
if (numEntriesToScan == 0) {
return DEFAULT_EST_CARDINALITY;
}
@@ -303,7 +303,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
* @return estimated average string size
*/
public synchronized int getEstimatedAvgColSize(@Nonnull String columnName) {
- int numEntriesToScan = getNumntriesToScan();
+ int numEntriesToScan = getNumEntriesToScan();
if (numEntriesToScan == 0) {
return DEFAULT_EST_AVG_COL_SIZE;
}
@@ -327,7 +327,7 @@ public class RealtimeSegmentStatsHistory implements Serializable {
}
public synchronized int getEstimatedRowsToIndex() {
- int numEntriesToScan = getNumntriesToScan();
+ int numEntriesToScan = getNumEntriesToScan();
if (numEntriesToScan == 0) {
return DEFAULT_ROWS_TO_INDEX;
}
@@ -341,6 +341,16 @@ public class RealtimeSegmentStatsHistory implements Serializable {
return (numRowsIndexed > 0) ? (int) (numRowsIndexed / numEntriesToScan) : DEFAULT_ROWS_TO_INDEX;
}
+ public synchronized long getLatestSegmentMemoryConsumed() {
+ if (isEmpty()) {
+ return -1;
+ }
+ // Get the last updated index
+ int latestSegmentIndex = (_cursor + _arraySize - 1) % _arraySize;
+ SegmentStats stats = getSegmentStatsAt(latestSegmentIndex);
+ return stats._memUsedBytes;
+ }
+
public SegmentStats getSegmentStatsAt(int index) {
return _entries[index];
}
@@ -374,20 +384,10 @@ public class RealtimeSegmentStatsHistory implements Serializable {
}
}
- private int getNumntriesToScan() {
+ private int getNumEntriesToScan() {
if (isFull()) {
return getArraySize();
}
return getCursor();
}
-
- public static void main(String[] args)
- throws Exception {
- RealtimeSegmentStatsHistory history = RealtimeSegmentStatsHistory.deserialzeFrom(new File("/tmp/stats.ser"));
- System.out.println(history.toString());
- for (int i = 0; i < history.getNumntriesToScan(); i++) {
- SegmentStats segmentStats = history.getSegmentStatsAt(i);
- System.out.println(segmentStats.toString());
- }
- }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
index eeb0db7..a3dcb56 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentStatsHistoryTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.realtime.impl;
import java.io.File;
+import java.io.IOException;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.util.TestUtils;
@@ -250,6 +251,29 @@ public class RealtimeSegmentStatsHistoryTest {
Assert.assertEquals(segmentStats.getNumSeconds(), 700);
}
+ @Test
+ public void testLatestConsumedMemory()
+ throws IOException, ClassNotFoundException {
+ final String tmpDir = System.getProperty("java.io.tmpdir");
+ File serializedFile = new File(tmpDir, STATS_FILE_NAME);
+ serializedFile.deleteOnExit();
+ FileUtils.deleteQuietly(serializedFile);
+ long[] memoryValues = {100, 100, 200, 400, 450, 600};
+
+ RealtimeSegmentStatsHistory history = RealtimeSegmentStatsHistory.deserialzeFrom(serializedFile);
+ Assert.assertEquals(history.getLatestSegmentMemoryConsumed(), -1);
+ RealtimeSegmentStatsHistory.SegmentStats segmentStats = null;
+
+ for (int i = 0; i < memoryValues.length; i++) {
+ segmentStats = new RealtimeSegmentStatsHistory.SegmentStats();
+ segmentStats.setMemUsedBytes(memoryValues[i]);
+ history.addSegmentStats(segmentStats);
+ }
+
+ long expectedMemUsed = memoryValues[memoryValues.length - 1];
+ Assert.assertEquals(history.getLatestSegmentMemoryConsumed(), expectedMemUsed);
+ }
+
private static class StatsUpdater implements Runnable {
private final RealtimeSegmentStatsHistory _statsHistory;
private final int _numIterations;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
index aafcb8b..72df278 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/MmapDebugResource.java
@@ -20,14 +20,26 @@ package org.apache.pinot.server.api.resources;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.util.Collections;
import java.util.List;
+import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
/**
@@ -37,6 +49,9 @@ import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@Path("debug")
public class MmapDebugResource {
+ @Inject
+ private ServerInstance _serverInstance;
+
@GET
@Path("memory/offheap")
@ApiOperation(value = "View current off-heap allocations", notes = "Lists all off-heap allocations and their associated sizes")
@@ -45,4 +60,32 @@ public class MmapDebugResource {
public List<String> getOffHeapSizes() {
return PinotDataBuffer.getBufferInfo();
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/memory/offheap/table/{tableName}")
+ @ApiOperation(value = "Show off heap memory consumed by latest mutable segment", notes = "Returns off heap memory consumed by latest consuming segment of realtime table")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Table not found")})
+ public String getTableSize(
+ @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName)
+ throws WebApplicationException {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType != TableType.REALTIME) {
+ throw new WebApplicationException("This api cannot be used with non real-time table: " + tableName,
+ Response.Status.BAD_REQUEST);
+ }
+
+ InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
+ if (instanceDataManager == null) {
+ throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ RealtimeTableDataManager realtimeTableDataManager =
+ (RealtimeTableDataManager) instanceDataManager.getTableDataManager(tableName);
+ if (realtimeTableDataManager == null) {
+ throw new WebApplicationException("Table: " + tableName + " is not found", Response.Status.NOT_FOUND);
+ }
+
+ long memoryConsumed = realtimeTableDataManager.getStatsHistory().getLatestSegmentMemoryConsumed();
+ return ResourceUtils.convertToJsonString(Collections.singletonMap("offheapMemoryConsumed", memoryConsumed));
+ }
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
index 5828a74..007ce5d 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableSizeResource.java
@@ -53,7 +53,7 @@ import org.apache.pinot.server.starter.ServerInstance;
public class TableSizeResource {
@Inject
- ServerInstance serverInstance;
+ private ServerInstance _serverInstance;
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -64,7 +64,7 @@ public class TableSizeResource {
@ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Provide detailed information") @DefaultValue("true") @QueryParam("detailed") boolean detailed)
throws WebApplicationException {
- InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+ InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
if (instanceDataManager == null) {
throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index d670da7..65d1a5f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -67,7 +67,7 @@ public class TablesResource {
private static final String PEER_SEGMENT_DOWNLOAD_DIR = "peerSegmentDownloadDir";
@Inject
- ServerInstance serverInstance;
+ private ServerInstance _serverInstance;
@Inject
private AccessControlFactory _accessControlFactory;
@@ -85,10 +85,10 @@ public class TablesResource {
}
private InstanceDataManager checkGetInstanceDataManager() {
- if (serverInstance == null) {
+ if (_serverInstance == null) {
throw new WebApplicationException("Server initialization error. Missing server instance");
}
- InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+ InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
if (instanceDataManager == null) {
throw new WebApplicationException("Server initialization error. Missing data manager",
Response.Status.INTERNAL_SERVER_ERROR);
@@ -218,7 +218,7 @@ public class TablesResource {
// Note that two clients asking the same segment file will result in the same tar.gz files being created twice.
// Will revisit for optimization if performance becomes an issue.
File tmpSegmentTarDir =
- new File(serverInstance.getInstanceDataManager().getSegmentFileDirectory(), PEER_SEGMENT_DOWNLOAD_DIR);
+ new File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(), PEER_SEGMENT_DOWNLOAD_DIR);
tmpSegmentTarDir.mkdir();
File segmentTarFile = new File(tmpSegmentTarDir, tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org