You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/12/07 18:20:33 UTC
[incubator-pinot] branch master updated: API to get status of
consumption of a table (#6322)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 41a3fc4 API to get status of consumption of a table (#6322)
41a3fc4 is described below
commit 41a3fc4f7639c879a2c76e9c1e29efeaeac6e58e
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Dec 7 10:20:15 2020 -0800
API to get status of consumption of a table (#6322)
API to get status of consumption of a table. This includes 1. consumer status 2. partition to offsets map 3. server name 4. last consumed timestamp, for consuming segments across all servers.
---
.../restlet/resources/SegmentConsumerInfo.java | 61 ++++
.../api/resources/PinotSegmentRestletResource.java | 29 ++
.../helix/core/PinotHelixResourceManager.java | 18 ++
.../util/ConsumingSegmentInfoReader.java | 167 +++++++++++
.../api/ConsumingSegmentInfoReaderTest.java | 319 +++++++++++++++++++++
.../realtime/HLRealtimeSegmentDataManager.java | 16 ++
.../realtime/LLRealtimeSegmentDataManager.java | 17 ++
.../realtime/RealtimeSegmentDataManager.java | 21 ++
.../pinot/server/api/resources/TablesResource.java | 41 +++
9 files changed, 689 insertions(+)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
new file mode 100644
index 0000000..83e0433
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+
+/**
+ * Information regarding the consumer of a segment
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SegmentConsumerInfo {
+ private final String _segmentName;
+ private final String _consumerState;
+ private final long _lastConsumedTimestamp;
+ private final Map<String, String> _partitionToOffsetMap;
+
+ public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName,
+ @JsonProperty("consumerState") String consumerState,
+ @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
+ @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
+ _segmentName = segmentName;
+ _consumerState = consumerState;
+ _lastConsumedTimestamp = lastConsumedTimestamp;
+ _partitionToOffsetMap = partitionToOffsetMap;
+ }
+
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public String getConsumerState() {
+ return _consumerState;
+ }
+
+ public long getLastConsumedTimestamp() {
+ return _lastConsumedTimestamp;
+ }
+
+ public Map<String, String> getPartitionToOffsetMap() {
+ return _partitionToOffsetMap;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 876d3a9..b62ce61 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -58,7 +61,9 @@ import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -542,4 +547,28 @@ public class PinotSegmentRestletResource {
return tableMetadataReader.getSegmentsMetadata(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
}
+
+ @GET
+ @Path("/tables/{realtimeTableName}/consumingSegmentsInfo")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Returns state of consuming segments", notes = "Gets the status of consumers from all servers")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Table not found"), @ApiResponse(code = 500, message = "Internal server error")})
+ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo(
+ @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | myTable_REALTIME") @PathParam("realtimeTableName") String realtimeTableName) {
+ try {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ if (TableType.OFFLINE == tableType) {
+ throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + realtimeTableName);
+ }
+ String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
+ new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager);
+ return consumingSegmentInfoReader
+ .getConsumingSegmentsInfo(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s. %s", realtimeTableName, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9718d3f..0c99037 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1895,6 +1895,24 @@ public class PinotHelixResourceManager {
return serverToSegmentsMap;
}
+ /**
+ * Returns a set of CONSUMING segments for the given realtime table.
+ */
+ public Set<String> getConsumingSegments(String tableNameWithType) {
+ IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+ if (idealState == null) {
+ throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
+ }
+ Set<String> consumingSegments = new HashSet<>();
+ for (String segment : idealState.getPartitionSet()) {
+ Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segment);
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+ consumingSegments.add(segment);
+ }
+ }
+ return consumingSegments;
+ }
+
public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
// Get the segment list for this table
IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
new file mode 100644
index 0000000..e440d8b
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a helper class that calls the server API endpoints to fetch consuming segments info
+ * Only the servers returning success are returned by the method. For servers returning errors (http error or otherwise),
+ * no entry is created in the return list
+ */
+public class ConsumingSegmentInfoReader {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReader.class);
+
+ private final Executor _executor;
+ private final HttpConnectionManager _connectionManager;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+ public ConsumingSegmentInfoReader(Executor executor, HttpConnectionManager connectionManager,
+ PinotHelixResourceManager helixResourceManager) {
+ _executor = executor;
+ _connectionManager = connectionManager;
+ _pinotHelixResourceManager = helixResourceManager;
+ }
+
+ /**
+ * This method retrieves the consuming segments info for a given realtime table.
+ * @return a map of segmentName to the information about its consumer
+ */
+ public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithType, int timeoutMs)
+ throws InvalidConfigException {
+ final Map<String, List<String>> serverToSegments =
+ _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> serverToEndpoints =
+ _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+
+ // Gets info for segments with LLRealtimeSegmentDataManager found in the table data manager
+ Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
+ getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs);
+ TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new TreeMap<>();
+ for (Map.Entry<String, List<SegmentConsumerInfo>> entry : serverToSegmentConsumerInfoMap.entrySet()) {
+ String serverName = entry.getKey();
+ for (SegmentConsumerInfo info : entry.getValue()) {
+ consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList<>()).add(
+ new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(),
+ info.getPartitionToOffsetMap()));
+ }
+ }
+ // Segments which are in CONSUMING state but found no consumer on the server
+ Set<String> consumingSegments = _pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
+ consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, Collections.emptyList()));
+ return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
+ }
+
+ /**
+ * This method makes a MultiGet call to all servers to get the consuming segments info.
+ * @return servers queried and a list of consumer status information for consuming segments on that server
+ */
+ private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String tableNameWithType,
+ BiMap<String, String> serverToEndpoints, int timeoutMs) {
+ LOGGER.info("Reading consuming segment info from servers: {} for table: {}", serverToEndpoints.keySet(),
+ tableNameWithType);
+
+ List<String> serverUrls = new ArrayList<>(serverToEndpoints.size());
+ BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String consumingSegmentInfoURI = generateServerURL(tableNameWithType, endpoint);
+ serverUrls.add(consumingSegmentInfoURI);
+ }
+
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, tableNameWithType, timeoutMs);
+ Map<String, List<SegmentConsumerInfo>> serverToConsumingSegmentInfoList = new HashMap<>();
+ int failedParses = 0;
+ for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+ try {
+ List<SegmentConsumerInfo> segmentConsumerInfos =
+ JsonUtils.stringToObject(streamResponse.getValue(), new TypeReference<List<SegmentConsumerInfo>>() {
+ });
+ serverToConsumingSegmentInfoList.put(streamResponse.getKey(), segmentConsumerInfos);
+ } catch (IOException e) {
+ failedParses++;
+ LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
+ }
+ }
+ if (failedParses != 0) {
+ LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", failedParses, serverUrls.size());
+ }
+ return serverToConsumingSegmentInfoList;
+ }
+
+ private String generateServerURL(String tableNameWithType, String endpoint) {
+ return String.format("http://%s/tables/%s/consumingSegmentsInfo", endpoint, tableNameWithType);
+ }
+
+ /**
+ * Map containing all consuming segments and their status information
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ static public class ConsumingSegmentsInfoMap {
+ public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap;
+
+ public ConsumingSegmentsInfoMap(
+ @JsonProperty("segmentToConsumingInfoMap") TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) {
+ this._segmentToConsumingInfoMap = segmentToConsumingInfoMap;
+ }
+ }
+
+ /**
+ * Contains all the information about a consuming segment
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ static public class ConsumingSegmentInfo {
+ public String _serverName;
+ public String _consumerState;
+ public long _lastConsumedTimestamp;
+ public Map<String, String> _partitionToOffsetMap;
+
+ public ConsumingSegmentInfo(@JsonProperty("serverName") String serverName,
+ @JsonProperty("consumerState") String consumerState,
+ @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
+ @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
+ _serverName = serverName;
+ _consumerState = consumerState;
+ _lastConsumedTimestamp = lastConsumedTimestamp;
+ _partitionToOffsetMap = partitionToOffsetMap;
+ }
+ }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
new file mode 100644
index 0000000..e09c255
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.ConsumerState;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentMatchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests the {@link ConsumingSegmentInfoReader}
+ */
+public class ConsumingSegmentInfoReaderTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReaderTest.class);
+ private final Executor executor = Executors.newFixedThreadPool(1);
+ private final HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
+ private PinotHelixResourceManager helix;
+ private final Map<String, FakeConsumingInfoServer> serverMap = new HashMap<>();
+ private final int timeoutMsec = 10000;
+
+ private static final String TABLE_NAME = "myTable_REALTIME";
+ private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345";
+ private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345";
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ helix = mock(PinotHelixResourceManager.class);
+ String uriPath = "/tables/";
+
+ // server0 - 1 consumer each for p0 and p1. CONSUMING.
+ Map<String, String> partitionToOffset0 = new HashMap<>();
+ partitionToOffset0.put("0", "150");
+ Map<String, String> partitionToOffset1 = new HashMap<>();
+ partitionToOffset1.put("1", "150");
+ FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+ s0.start(uriPath, createHandler(200, s0.consumerInfos, 0));
+ serverMap.put("server0", s0);
+
+ // server1 - 1 consumer each for p0 and p1. CONSUMING.
+ FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+ s1.start(uriPath, createHandler(200, s1.consumerInfos, 0));
+ serverMap.put("server1", s1);
+
+ // server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING
+ FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0, partitionToOffset0),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+ s2.start(uriPath, createHandler(200, s2.consumerInfos, 0));
+ serverMap.put("server2", s2);
+
+ // server3 - 1 consumer for p1. No consumer for p0
+ FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
+ Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+ s3.start(uriPath, createHandler(200, s3.consumerInfos, 0));
+ serverMap.put("server3", s3);
+
+ // server4 - unreachable/error/timeout
+ FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+ s4.start(uriPath, createHandler(200, s4.consumerInfos, timeoutMsec * 1000));
+ serverMap.put("server4", s4);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (Map.Entry<String, FakeConsumingInfoServer> fakeServerEntry : serverMap.entrySet()) {
+ fakeServerEntry.getValue().httpServer.stop(0);
+ }
+ }
+
+ private HttpHandler createHandler(final int status, final List<SegmentConsumerInfo> consumerInfos,
+ final int sleepTimeMs) {
+ return httpExchange -> {
+ if (sleepTimeMs > 0) {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.info("Handler interrupted during sleep");
+ }
+ }
+ String json = JsonUtils.objectToString(consumerInfos);
+ httpExchange.sendResponseHeaders(status, json.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(json.getBytes());
+ responseBody.close();
+ };
+ }
+
+ /**
+ * Server to return fake consuming segment info
+ */
+ private static class FakeConsumingInfoServer {
+ String endpoint;
+ InetSocketAddress socket = new InetSocketAddress(0);
+ List<SegmentConsumerInfo> consumerInfos;
+ HttpServer httpServer;
+
+ FakeConsumingInfoServer(List<SegmentConsumerInfo> consumerInfos) {
+ this.consumerInfos = consumerInfos;
+ }
+
+ private void start(String path, HttpHandler handler)
+ throws IOException {
+ httpServer = HttpServer.create(socket, 0);
+ httpServer.createContext(path, handler);
+ new Thread(() -> httpServer.start()).start();
+ endpoint = "localhost:" + httpServer.getAddress().getPort();
+ }
+ }
+
+ private Map<String, List<String>> subsetOfServerSegments(String... servers) {
+ Map<String, List<String>> subset = new HashMap<>();
+ for (String server : servers) {
+ subset.put(server, serverMap.get(server).consumerInfos.stream().map(SegmentConsumerInfo::getSegmentName)
+ .collect(Collectors.toList()));
+ }
+ return subset;
+ }
+
+ private BiMap<String, String> serverEndpoints(String... servers) {
+ BiMap<String, String> endpoints = HashBiMap.create(servers.length);
+ for (String server : servers) {
+ endpoints.put(server, serverMap.get(server).endpoint);
+ }
+ return endpoints;
+ }
+
+ private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers,
+ final Set<String> consumingSegments, String table)
+ throws InvalidConfigException {
+ when(helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers));
+ when(helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet()))
+ .thenAnswer(invocationOnMock -> serverEndpoints(servers));
+ when(helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments);
+ ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(executor, connectionManager, helix);
+ return reader.getConsumingSegmentsInfo(table, timeoutMsec);
+ }
+
+ @Test
+ public void testEmptyTable()
+ throws InvalidConfigException {
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(new String[]{}, Collections.emptySet(), TABLE_NAME);
+ Assert.assertTrue(consumingSegmentsInfoMap._segmentToConsumingInfoMap.isEmpty());
+ }
+
+ /**
+ * 2 servers, 2 partitions, 2 replicas, all CONSUMING
+ */
+ @Test
+ public void testHappyPath()
+ throws InvalidConfigException {
+ final String[] servers = {"server0", "server1"};
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+ consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+ Assert.assertEquals(consumingSegmentInfos.size(), 2);
+ for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+ checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "0",
+ "150");
+ }
+ consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+ Assert.assertEquals(consumingSegmentInfos.size(), 2);
+ for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+ checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "1",
+ "150");
+ }
+ }
+
+ /**
+ * 2 servers, 2 partitions, 2 replicas. p0 consumer in NOT_CONSUMING
+ */
+ @Test
+ public void testNotConsumingState()
+ throws InvalidConfigException {
+ final String[] servers = {"server0", "server2"};
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+ consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+ Assert.assertEquals(consumingSegmentInfos.size(), 2);
+ for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+ if (info._serverName.equals("server0")) {
+ checkConsumingSegmentInfo(info, Sets.newHashSet("server0"), ConsumerState.CONSUMING.toString(), "0", "150");
+ } else {
+ checkConsumingSegmentInfo(info, Sets.newHashSet("server2"), ConsumerState.NOT_CONSUMING.toString(), "0", "150");
+ }
+ }
+ consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+ Assert.assertEquals(consumingSegmentInfos.size(), 2);
+ for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+ checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server2"), ConsumerState.CONSUMING.toString(), "1",
+ "150");
+ }
+ }
+
+ /**
+ * 1 servers, 2 partitions, 1 replicas. No consumer for p0. CONSUMING state in idealstate.
+ */
+ @Test
+ public void testNoConsumerButConsumingInIdealState()
+ throws InvalidConfigException {
+ final String[] servers = {"server3"};
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+ consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+ Assert.assertTrue(consumingSegmentInfos.isEmpty());
+
+ consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+ Assert.assertEquals(consumingSegmentInfos.size(), 1);
+ checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
+ ConsumerState.CONSUMING.toString(), "1", "150");
+ }
+
+ /**
+ * 1 servers, 2 partitions, 1 replicas. No consumer for p0. OFFLINE state in idealstate.
+ */
+ @Test
+ public void testNoConsumerOfflineInIdealState()
+ throws InvalidConfigException {
+ final String[] servers = {"server3"};
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+ consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+ Assert.assertNull(consumingSegmentInfos);
+
+ consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+ Assert.assertEquals(consumingSegmentInfos.size(), 1);
+ checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
+ ConsumerState.CONSUMING.toString(), "1", "150");
+ }
+
+ /**
+ * 2 servers, 2 partitions, 2 replicas. server4 times out.
+ */
+ @Test
+ public void testErrorFromServer()
+ throws InvalidConfigException {
+ final String[] servers = {"server0", "server4"};
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+ testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+ consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+ Assert.assertEquals(consumingSegmentInfos.size(), 1);
+ checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
+ ConsumerState.CONSUMING.toString(), "0", "150");
+
+ consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+ Assert.assertEquals(consumingSegmentInfos.size(), 1);
+ checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
+ ConsumerState.CONSUMING.toString(), "1", "150");
+ }
+
+ private void checkConsumingSegmentInfo(ConsumingSegmentInfoReader.ConsumingSegmentInfo info, Set<String> serverNames,
+ String consumerState, String partition, String offset) {
+ Assert.assertTrue(serverNames.contains(info._serverName));
+ Assert.assertEquals(info._consumerState, consumerState);
+ Assert.assertEquals(info._partitionToOffsetMap.get(partition), offset);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 060f19b..ab608fd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
@@ -417,6 +418,21 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
@Override
+ public Map<String, String> getPartitionToCurrentOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ConsumerState getConsumerState() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLastConsumedTimestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String getSegmentName() {
return _segmentName;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 13deed5..62e6d5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -701,6 +701,23 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
}
+ @Override
+ public Map<String, String> getPartitionToCurrentOffset() {
+ Map<String, String> partitionToCurrentOffset = new HashMap<>();
+ partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString());
+ return partitionToCurrentOffset;
+ }
+
+ @Override
+ public ConsumerState getConsumerState() {
+ return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
+ }
+
+ @Override
+ public long getLastConsumedTimestamp() {
+ return _lastLogTime;
+ }
+
@VisibleForTesting
protected StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 6d95ffa..0086c4f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
@@ -28,6 +29,14 @@ import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
public abstract class RealtimeSegmentDataManager extends SegmentDataManager {
+ /**
+ * The state of the consumer of this segment
+ */
+ public enum ConsumerState {
+ CONSUMING,
+ NOT_CONSUMING // In error state
+ }
+
@Override
public abstract MutableSegment getSegment();
@@ -41,4 +50,16 @@ public abstract class RealtimeSegmentDataManager extends SegmentDataManager {
return new DirectMemoryManager(segmentName, serverMetrics);
}
}
+
+ /**
+ * Get the current offsets for all partitions of this consumer
+ */
+ public abstract Map<String, String> getPartitionToCurrentOffset();
+
+ /**
+ * Get the state of the consumer
+ */
+ public abstract ConsumerState getConsumerState();
+
+ public abstract long getLastConsumedTimestamp();
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 65d1a5f..e5d533c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
@@ -52,10 +53,13 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.server.api.access.AccessControl;
import org.apache.pinot.server.api.access.AccessControlFactory;
import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -239,4 +243,41 @@ public class TablesResource {
tableDataManager.releaseSegment(segmentDataManager);
}
}
+
+ @GET
+ @Path("tables/{realtimeTableName}/consumingSegmentsInfo")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes = "Get consumers info from the table data manager")
+ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
+ @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName) {
+
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ if (TableType.OFFLINE == tableType) {
+ throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName);
+ }
+ String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+
+ List<SegmentConsumerInfo> segmentConsumerInfoList = new ArrayList<>();
+ TableDataManager tableDataManager = checkGetTableDataManager(tableNameWithType);
+ List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments();
+ try {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
+ String segmentName = segmentDataManager.getSegmentName();
+ segmentConsumerInfoList.add(
+ new SegmentConsumerInfo(segmentName, realtimeSegmentDataManager.getConsumerState().toString(),
+ realtimeSegmentDataManager.getLastConsumedTimestamp(),
+ realtimeSegmentDataManager.getPartitionToCurrentOffset()));
+ }
+ }
+ } catch (Exception e) {
+ throw new WebApplicationException("Caught exception when getting consumer info for table: " + realtimeTableName);
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ return segmentConsumerInfoList;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org