You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/12/07 18:20:33 UTC

[incubator-pinot] branch master updated: API to get status of consumption of a table (#6322)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 41a3fc4  API to get status of consumption of a table (#6322)
41a3fc4 is described below

commit 41a3fc4f7639c879a2c76e9c1e29efeaeac6e58e
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Dec 7 10:20:15 2020 -0800

    API to get status of consumption of a table (#6322)
    
    API to get status of consumption of a table. This includes 1. consumer status 2. partition to offsets map 3. server name 4. last consumed timestamp, for consuming segments across all servers.
---
 .../restlet/resources/SegmentConsumerInfo.java     |  61 ++++
 .../api/resources/PinotSegmentRestletResource.java |  29 ++
 .../helix/core/PinotHelixResourceManager.java      |  18 ++
 .../util/ConsumingSegmentInfoReader.java           | 167 +++++++++++
 .../api/ConsumingSegmentInfoReaderTest.java        | 319 +++++++++++++++++++++
 .../realtime/HLRealtimeSegmentDataManager.java     |  16 ++
 .../realtime/LLRealtimeSegmentDataManager.java     |  17 ++
 .../realtime/RealtimeSegmentDataManager.java       |  21 ++
 .../pinot/server/api/resources/TablesResource.java |  41 +++
 9 files changed, 689 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
new file mode 100644
index 0000000..83e0433
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+
+/**
+ * Information regarding the consumer of a segment
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SegmentConsumerInfo {
+  private final String _segmentName;
+  private final String _consumerState;
+  private final long _lastConsumedTimestamp;
+  private final Map<String, String> _partitionToOffsetMap;
+
+  public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName,
+      @JsonProperty("consumerState") String consumerState,
+      @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
+      @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
+    _segmentName = segmentName;
+    _consumerState = consumerState;
+    _lastConsumedTimestamp = lastConsumedTimestamp;
+    _partitionToOffsetMap = partitionToOffsetMap;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public String getConsumerState() {
+    return _consumerState;
+  }
+
+  public long getLastConsumedTimestamp() {
+    return _lastConsumedTimestamp;
+  }
+
+  public Map<String, String> getPartitionToOffsetMap() {
+    return _partitionToOffsetMap;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 876d3a9..b62ce61 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -58,7 +61,9 @@ import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -542,4 +547,28 @@ public class PinotSegmentRestletResource {
     return tableMetadataReader.getSegmentsMetadata(tableNameWithType,
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
+
+  @GET
+  @Path("/tables/{realtimeTableName}/consumingSegmentsInfo")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of consuming segments", notes = "Gets the status of consumers from all servers")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Table not found"), @ApiResponse(code = 500, message = "Internal server error")})
+  public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo(
+      @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | myTable_REALTIME") @PathParam("realtimeTableName") String realtimeTableName) {
+    try {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      if (TableType.OFFLINE == tableType) {
+        throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + realtimeTableName);
+      }
+      String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+      ConsumingSegmentInfoReader consumingSegmentInfoReader =
+          new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager);
+      return consumingSegmentInfoReader
+          .getConsumingSegmentsInfo(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. %s", realtimeTableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9718d3f..0c99037 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1895,6 +1895,24 @@ public class PinotHelixResourceManager {
     return serverToSegmentsMap;
   }
 
+  /**
+   * Returns a set of CONSUMING segments for the given realtime table.
+   */
+  public Set<String> getConsumingSegments(String tableNameWithType) {
+    IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+    if (idealState == null) {
+      throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
+    }
+    Set<String> consumingSegments = new HashSet<>();
+    for (String segment : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segment);
+      if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+        consumingSegments.add(segment);
+      }
+    }
+    return consumingSegments;
+  }
+
   public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
     // Get the segment list for this table
     IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
new file mode 100644
index 0000000..e440d8b
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a helper class that calls the server API endpoints to fetch consuming segments info
+ * Only the servers returning success are returned by the method. For servers returning errors (http error or otherwise),
+ * no entry is created in the return list
+ */
+public class ConsumingSegmentInfoReader {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public ConsumingSegmentInfoReader(Executor executor, HttpConnectionManager connectionManager,
+      PinotHelixResourceManager helixResourceManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _pinotHelixResourceManager = helixResourceManager;
+  }
+
+  /**
+   * This method retrieves the consuming segments info for a given realtime table.
+   * @return a map of segmentName to the information about its consumer
+   */
+  public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithType, int timeoutMs)
+      throws InvalidConfigException {
+    final Map<String, List<String>> serverToSegments =
+        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    BiMap<String, String> serverToEndpoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+
+    // Gets info for segments with LLRealtimeSegmentDataManager found in the table data manager
+    Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
+        getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs);
+    TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new TreeMap<>();
+    for (Map.Entry<String, List<SegmentConsumerInfo>> entry : serverToSegmentConsumerInfoMap.entrySet()) {
+      String serverName = entry.getKey();
+      for (SegmentConsumerInfo info : entry.getValue()) {
+        consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList<>()).add(
+            new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(),
+                info.getPartitionToOffsetMap()));
+      }
+    }
+    // Segments which are in CONSUMING state but found no consumer on the server
+    Set<String> consumingSegments = _pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
+    consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, Collections.emptyList()));
+    return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
+  }
+
+  /**
+   * This method makes a MultiGet call to all servers to get the consuming segments info.
+   * @return servers queried and a list of consumer status information for consuming segments on that server
+   */
+  private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String tableNameWithType,
+      BiMap<String, String> serverToEndpoints, int timeoutMs) {
+    LOGGER.info("Reading consuming segment info from servers: {} for table: {}", serverToEndpoints.keySet(),
+        tableNameWithType);
+
+    List<String> serverUrls = new ArrayList<>(serverToEndpoints.size());
+    BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String consumingSegmentInfoURI = generateServerURL(tableNameWithType, endpoint);
+      serverUrls.add(consumingSegmentInfoURI);
+    }
+
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, tableNameWithType, timeoutMs);
+    Map<String, List<SegmentConsumerInfo>> serverToConsumingSegmentInfoList = new HashMap<>();
+    int failedParses = 0;
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      try {
+        List<SegmentConsumerInfo> segmentConsumerInfos =
+            JsonUtils.stringToObject(streamResponse.getValue(), new TypeReference<List<SegmentConsumerInfo>>() {
+            });
+        serverToConsumingSegmentInfoList.put(streamResponse.getKey(), segmentConsumerInfos);
+      } catch (IOException e) {
+        failedParses++;
+        LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", failedParses, serverUrls.size());
+    }
+    return serverToConsumingSegmentInfoList;
+  }
+
+  private String generateServerURL(String tableNameWithType, String endpoint) {
+    return String.format("http://%s/tables/%s/consumingSegmentsInfo", endpoint, tableNameWithType);
+  }
+
+  /**
+   * Map containing all consuming segments and their status information
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static public class ConsumingSegmentsInfoMap {
+    public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap;
+
+    public ConsumingSegmentsInfoMap(
+        @JsonProperty("segmentToConsumingInfoMap") TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) {
+      this._segmentToConsumingInfoMap = segmentToConsumingInfoMap;
+    }
+  }
+
+  /**
+   * Contains all the information about a consuming segment
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static public class ConsumingSegmentInfo {
+    public String _serverName;
+    public String _consumerState;
+    public long _lastConsumedTimestamp;
+    public Map<String, String> _partitionToOffsetMap;
+
+    public ConsumingSegmentInfo(@JsonProperty("serverName") String serverName,
+        @JsonProperty("consumerState") String consumerState,
+        @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
+        @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
+      _serverName = serverName;
+      _consumerState = consumerState;
+      _lastConsumedTimestamp = lastConsumedTimestamp;
+      _partitionToOffsetMap = partitionToOffsetMap;
+    }
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
new file mode 100644
index 0000000..e09c255
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.ConsumerState;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentMatchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests the {@link ConsumingSegmentInfoReader}
+ */
+public class ConsumingSegmentInfoReaderTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReaderTest.class);
+  private final Executor executor = Executors.newFixedThreadPool(1);
+  private final HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
+  private PinotHelixResourceManager helix;
+  private final Map<String, FakeConsumingInfoServer> serverMap = new HashMap<>();
+  private final int timeoutMsec = 10000;
+
+  private static final String TABLE_NAME = "myTable_REALTIME";
+  private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345";
+  private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345";
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    helix = mock(PinotHelixResourceManager.class);
+    String uriPath = "/tables/";
+
+    // server0 - 1 consumer each for p0 and p1. CONSUMING.
+    Map<String, String> partitionToOffset0 = new HashMap<>();
+    partitionToOffset0.put("0", "150");
+    Map<String, String> partitionToOffset1 = new HashMap<>();
+    partitionToOffset1.put("1", "150");
+    FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+    s0.start(uriPath, createHandler(200, s0.consumerInfos, 0));
+    serverMap.put("server0", s0);
+
+    // server1 - 1 consumer each for p0 and p1. CONSUMING.
+    FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+    s1.start(uriPath, createHandler(200, s1.consumerInfos, 0));
+    serverMap.put("server1", s1);
+
+    // server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING
+    FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0, partitionToOffset0),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+    s2.start(uriPath, createHandler(200, s2.consumerInfos, 0));
+    serverMap.put("server2", s2);
+
+    // server3 - 1 consumer for p1. No consumer for p0
+    FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
+        Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+    s3.start(uriPath, createHandler(200, s3.consumerInfos, 0));
+    serverMap.put("server3", s3);
+
+    // server4 - unreachable/error/timeout
+    FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
+    s4.start(uriPath, createHandler(200, s4.consumerInfos, timeoutMsec * 1000));
+    serverMap.put("server4", s4);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (Map.Entry<String, FakeConsumingInfoServer> fakeServerEntry : serverMap.entrySet()) {
+      fakeServerEntry.getValue().httpServer.stop(0);
+    }
+  }
+
+  private HttpHandler createHandler(final int status, final List<SegmentConsumerInfo> consumerInfos,
+      final int sleepTimeMs) {
+    return httpExchange -> {
+      if (sleepTimeMs > 0) {
+        try {
+          Thread.sleep(sleepTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.info("Handler interrupted during sleep");
+        }
+      }
+      String json = JsonUtils.objectToString(consumerInfos);
+      httpExchange.sendResponseHeaders(status, json.length());
+      OutputStream responseBody = httpExchange.getResponseBody();
+      responseBody.write(json.getBytes());
+      responseBody.close();
+    };
+  }
+
+  /**
+   * Server to return fake consuming segment info
+   */
+  private static class FakeConsumingInfoServer {
+    String endpoint;
+    InetSocketAddress socket = new InetSocketAddress(0);
+    List<SegmentConsumerInfo> consumerInfos;
+    HttpServer httpServer;
+
+    FakeConsumingInfoServer(List<SegmentConsumerInfo> consumerInfos) {
+      this.consumerInfos = consumerInfos;
+    }
+
+    private void start(String path, HttpHandler handler)
+        throws IOException {
+      httpServer = HttpServer.create(socket, 0);
+      httpServer.createContext(path, handler);
+      new Thread(() -> httpServer.start()).start();
+      endpoint = "localhost:" + httpServer.getAddress().getPort();
+    }
+  }
+
+  private Map<String, List<String>> subsetOfServerSegments(String... servers) {
+    Map<String, List<String>> subset = new HashMap<>();
+    for (String server : servers) {
+      subset.put(server, serverMap.get(server).consumerInfos.stream().map(SegmentConsumerInfo::getSegmentName)
+          .collect(Collectors.toList()));
+    }
+    return subset;
+  }
+
+  private BiMap<String, String> serverEndpoints(String... servers) {
+    BiMap<String, String> endpoints = HashBiMap.create(servers.length);
+    for (String server : servers) {
+      endpoints.put(server, serverMap.get(server).endpoint);
+    }
+    return endpoints;
+  }
+
+  private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers,
+      final Set<String> consumingSegments, String table)
+      throws InvalidConfigException {
+    when(helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers));
+    when(helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet()))
+        .thenAnswer(invocationOnMock -> serverEndpoints(servers));
+    when(helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments);
+    ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(executor, connectionManager, helix);
+    return reader.getConsumingSegmentsInfo(table, timeoutMsec);
+  }
+
+  @Test
+  public void testEmptyTable()
+      throws InvalidConfigException {
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(new String[]{}, Collections.emptySet(), TABLE_NAME);
+    Assert.assertTrue(consumingSegmentsInfoMap._segmentToConsumingInfoMap.isEmpty());
+  }
+
+  /**
+   * 2 servers, 2 partitions, 2 replicas, all CONSUMING
+   */
+  @Test
+  public void testHappyPath()
+      throws InvalidConfigException {
+    final String[] servers = {"server0", "server1"};
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+        consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+    Assert.assertEquals(consumingSegmentInfos.size(), 2);
+    for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+      checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "0",
+          "150");
+    }
+    consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+    Assert.assertEquals(consumingSegmentInfos.size(), 2);
+    for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+      checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "1",
+          "150");
+    }
+  }
+
+  /**
+   * 2 servers, 2 partitions, 2 replicas. p0 consumer in NOT_CONSUMING
+   */
+  @Test
+  public void testNotConsumingState()
+      throws InvalidConfigException {
+    final String[] servers = {"server0", "server2"};
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+        consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+    Assert.assertEquals(consumingSegmentInfos.size(), 2);
+    for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+      if (info._serverName.equals("server0")) {
+        checkConsumingSegmentInfo(info, Sets.newHashSet("server0"), ConsumerState.CONSUMING.toString(), "0", "150");
+      } else {
+        checkConsumingSegmentInfo(info, Sets.newHashSet("server2"), ConsumerState.NOT_CONSUMING.toString(), "0", "150");
+      }
+    }
+    consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+    Assert.assertEquals(consumingSegmentInfos.size(), 2);
+    for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
+      checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server2"), ConsumerState.CONSUMING.toString(), "1",
+          "150");
+    }
+  }
+
+  /**
+   * 1 servers, 2 partitions, 1 replicas. No consumer for p0. CONSUMING state in idealstate.
+   */
+  @Test
+  public void testNoConsumerButConsumingInIdealState()
+      throws InvalidConfigException {
+    final String[] servers = {"server3"};
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+        consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+    Assert.assertTrue(consumingSegmentInfos.isEmpty());
+
+    consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+    Assert.assertEquals(consumingSegmentInfos.size(), 1);
+    checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
+        ConsumerState.CONSUMING.toString(), "1", "150");
+  }
+
+  /**
+   * 1 servers, 2 partitions, 1 replicas. No consumer for p0. OFFLINE state in idealstate.
+   */
+  @Test
+  public void testNoConsumerOfflineInIdealState()
+      throws InvalidConfigException {
+    final String[] servers = {"server3"};
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+        consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+    Assert.assertNull(consumingSegmentInfos);
+
+    consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+    Assert.assertEquals(consumingSegmentInfos.size(), 1);
+    checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
+        ConsumerState.CONSUMING.toString(), "1", "150");
+  }
+
+  /**
+   * 2 servers, 2 partitions, 2 replicas. server4 times out.
+   */
+  @Test
+  public void testErrorFromServer()
+      throws InvalidConfigException {
+    final String[] servers = {"server0", "server4"};
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
+        testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME);
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
+        consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
+    Assert.assertEquals(consumingSegmentInfos.size(), 1);
+    checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
+        ConsumerState.CONSUMING.toString(), "0", "150");
+
+    consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
+    Assert.assertEquals(consumingSegmentInfos.size(), 1);
+    checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
+        ConsumerState.CONSUMING.toString(), "1", "150");
+  }
+
+  private void checkConsumingSegmentInfo(ConsumingSegmentInfoReader.ConsumingSegmentInfo info, Set<String> serverNames,
+      String consumerState, String partition, String offset) {
+    Assert.assertTrue(serverNames.contains(info._serverName));
+    Assert.assertEquals(info._consumerState, consumerState);
+    Assert.assertEquals(info._partitionToOffsetMap.get(partition), offset);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 060f19b..ab608fd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
@@ -417,6 +418,21 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   @Override
+  public Map<String, String> getPartitionToCurrentOffset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ConsumerState getConsumerState() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLastConsumedTimestamp() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public String getSegmentName() {
     return _segmentName;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 13deed5..62e6d5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -701,6 +701,23 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
   }
 
+  @Override
+  public Map<String, String> getPartitionToCurrentOffset() {
+    Map<String, String> partitionToCurrentOffset = new HashMap<>();
+    partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString());
+    return partitionToCurrentOffset;
+  }
+
+  @Override
+  public ConsumerState getConsumerState() {
+    return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
+  }
+
+  @Override
+  public long getLastConsumedTimestamp() {
+    return _lastLogTime;
+  }
+
   @VisibleForTesting
   protected StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 6d95ffa..0086c4f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
@@ -28,6 +29,14 @@ import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
 
 public abstract class RealtimeSegmentDataManager extends SegmentDataManager {
 
+  /**
+   * The state of the consumer of this segment
+   */
+  public enum ConsumerState {
+    CONSUMING,
+    NOT_CONSUMING // In error state
+  }
+
   @Override
   public abstract MutableSegment getSegment();
 
@@ -41,4 +50,16 @@ public abstract class RealtimeSegmentDataManager extends SegmentDataManager {
       return new DirectMemoryManager(segmentName, serverMetrics);
     }
   }
+
+  /**
+   * Get the current offsets for all partitions of this consumer
+   */
+  public abstract Map<String, String> getPartitionToCurrentOffset();
+
+  /**
+   * Get the state of the consumer
+   */
+  public abstract ConsumerState getConsumerState();
+
+  public abstract long getLastConsumedTimestamp();
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 65d1a5f..e5d533c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
 import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
@@ -52,10 +53,13 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.api.access.AccessControl;
 import org.apache.pinot.server.api.access.AccessControlFactory;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -239,4 +243,41 @@ public class TablesResource {
       tableDataManager.releaseSegment(segmentDataManager);
     }
   }
+
+  @GET
+  @Path("tables/{realtimeTableName}/consumingSegmentsInfo")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes = "Get consumers info from the table data manager")
+  public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
+      @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName) {
+
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+    if (TableType.OFFLINE == tableType) {
+      throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName);
+    }
+    String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+
+    List<SegmentConsumerInfo> segmentConsumerInfoList = new ArrayList<>();
+    TableDataManager tableDataManager = checkGetTableDataManager(tableNameWithType);
+    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments();
+    try {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+          RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
+          String segmentName = segmentDataManager.getSegmentName();
+          segmentConsumerInfoList.add(
+              new SegmentConsumerInfo(segmentName, realtimeSegmentDataManager.getConsumerState().toString(),
+                  realtimeSegmentDataManager.getLastConsumedTimestamp(),
+                  realtimeSegmentDataManager.getPartitionToCurrentOffset()));
+        }
+      }
+    } catch (Exception e) {
+      throw new WebApplicationException("Caught exception when getting consumer info for table: " + realtimeTableName);
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        tableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+    return segmentConsumerInfoList;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org