You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/07/11 16:29:51 UTC

[pinot] branch master updated: add api to check segment storage tier (#8914)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5625beab41 add api to check segment storage tier (#8914)
5625beab41 is described below

commit 5625beab4164346d1e884d19a294adfc8fb668f9
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Jul 11 09:29:45 2022 -0700

    add api to check segment storage tier (#8914)
    
    This PR adds two APIs to check immutable segment's current storage tier from servers. It's like how table size is calculated, reusing the multget util to get tier info from servers in parallel.
---
 .../common/restlet/resources/TableTierInfo.java    |  60 ++++
 .../api/resources/PinotSegmentRestletResource.java |  60 ++++
 .../controller/util/ServerTableTierReader.java     |  87 ++++++
 .../pinot/controller/util/TableTierReader.java     | 129 +++++++++
 .../pinot/controller/api/TableTierReaderTest.java  | 311 +++++++++++++++++++++
 .../server/api/resources/TableTierResource.java    | 156 +++++++++++
 .../pinot/server/api/TableTierResourceTest.java    |  77 +++++
 7 files changed, 880 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableTierInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableTierInfo.java
new file mode 100644
index 0000000000..f1a445d471
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableTierInfo.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.Map;
+import java.util.Set;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TableTierInfo {
+  @JsonPropertyDescription("Name of table to look for segment storage tiers")
+  private final String _tableName;
+
+  @JsonPropertyDescription("Storage tiers of segments hosted on the server")
+  private final Map<String, String> _segmentTiers;
+
+  @JsonPropertyDescription("Segments that's not immutable and has no storage tier")
+  private final Set<String> _mutableSegments;
+
+  @JsonCreator
+  public TableTierInfo(@JsonProperty("tableName") String tableName,
+      @JsonProperty("segmentTiers") Map<String, String> segmentTiers,
+      @JsonProperty("mutableSegments") Set<String> mutableSegments) {
+    _tableName = tableName;
+    _segmentTiers = segmentTiers;
+    _mutableSegments = mutableSegments;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public Map<String, String> getSegmentTiers() {
+    return _segmentTiers;
+  }
+
+  public Set<String> getMutableSegments() {
+    return _mutableSegments;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index a24ac50fdd..eb44d98846 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -72,6 +72,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -92,6 +93,8 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *       <li>"/segments/{tableName}/crc": get a map from segment to CRC of the segment (OFFLINE table only)</li>
  *       <li>"/segments/{tableName}/{segmentName}/metadata: get the metadata for a segment</li>
  *       <li>"/segments/{tableName}/metadata: get the metadata for all segments from the server</li>
+ *       <li>"/segments/{tableName}/{segmentName}/tiers": get storage tier for the segment in the table</li>
+ *       <li>"/segments/{tableName}/tiers": get storage tier for all segments in the table</li>
  *     </ul>
  *   </li>
  *   <li>
@@ -717,6 +720,63 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("segments/{tableName}/tiers")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get storage tier for all segments in the given table", notes = "Get storage tier for all "
+      + "segments in the given table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Table not found")
+  })
+  public TableTierReader.TableTierDetails getTableTiers(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
+    LOGGER.info("Received a request to get storage tier for all segments for table {}", tableName);
+    return getTableTierInternal(tableName, null, tableTypeStr);
+  }
+
+  @GET
+  @Path("segments/{tableName}/{segmentName}/tiers")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get storage tiers for the given segment", notes = "Get storage tiers for the given segment")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Table or segment not found")
+  })
+  public TableTierReader.TableTierDetails getSegmentTiers(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
+    segmentName = URIUtils.decode(segmentName);
+    LOGGER.info("Received a request to get storage tier for segment {} in table {}", segmentName, tableName);
+    return getTableTierInternal(tableName, segmentName, tableTypeStr);
+  }
+
+  private TableTierReader.TableTierDetails getTableTierInternal(String tableName, @Nullable String segmentName,
+      @Nullable String tableTypeStr) {
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    Preconditions.checkNotNull(tableType, "Table type is required to get table tiers");
+    String tableNameWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+    TableTierReader tableTierReader = new TableTierReader(_executor, _connectionManager, _pinotHelixResourceManager);
+    TableTierReader.TableTierDetails tableTierDetails;
+    try {
+      tableTierDetails = tableTierReader.getTableTierDetails(tableNameWithType, segmentName,
+          _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+    } catch (Throwable t) {
+      throw new ControllerApplicationException(LOGGER, String
+          .format("Failed to get tier info for segment: %s in table: %s of type: %s", segmentName, tableName,
+              tableTypeStr), Response.Status.INTERNAL_SERVER_ERROR, t);
+    }
+    if (segmentName != null && !tableTierDetails.getSegmentTiers().containsKey(segmentName)) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Segment: %s is not found in table: %s of type: %s", segmentName, tableName, tableTypeStr),
+          Response.Status.NOT_FOUND);
+    }
+    return tableTierDetails;
+  }
+
   @GET
   @Path("segments/{tableName}/select")
   @Produces(MediaType.APPLICATION_JSON)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerTableTierReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerTableTierReader.java
new file mode 100644
index 0000000000..26a894c3ee
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerTableTierReader.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.restlet.resources.TableTierInfo;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Get the storage tier details from multi servers in parallel. Only servers returning success are returned by the
+ * method. For those returning errors (http error or otherwise), no entry is created in the return map.
+ */
+public class ServerTableTierReader {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServerTableTierReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+
+  public ServerTableTierReader(Executor executor, HttpConnectionManager connectionManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  public Map<String, TableTierInfo> getTableTierInfoFromServers(BiMap<String, String> serverEndPoints,
+      String tableNameWithType, @Nullable String segmentName, int timeoutMs) {
+    int numServers = serverEndPoints.size();
+    LOGGER.info("Getting segment storage tiers from {} servers for table: {} with timeout: {}ms", numServers,
+        tableNameWithType, timeoutMs);
+    List<String> serverUrls = new ArrayList<>(numServers);
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String tierUri = endpoint;
+      if (segmentName == null) {
+        tierUri += "/tables/" + tableNameWithType + "/tiers";
+      } else {
+        tierUri += "/segments/" + tableNameWithType + "/" + segmentName + "/tiers";
+      }
+      serverUrls.add(tierUri);
+    }
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, tableNameWithType, false, timeoutMs);
+    Map<String, TableTierInfo> serverToTableTierInfoMap = new HashMap<>();
+    int failedParses = 0;
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      try {
+        TableTierInfo tableTierInfo = JsonUtils.stringToObject(streamResponse.getValue(), TableTierInfo.class);
+        serverToTableTierInfoMap.put(streamResponse.getKey(), tableTierInfo);
+      } catch (IOException e) {
+        failedParses++;
+        LOGGER.error("Failed to parse server {} response", streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.warn("Failed to parse {} / {} TableTierInfo responses from servers.", failedParses, serverUrls.size());
+    }
+    return serverToTableTierInfoMap;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableTierReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableTierReader.java
new file mode 100644
index 0000000000..67595f3a22
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableTierReader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.google.common.collect.BiMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.TableTierInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+
+
+/**
+ * Reads segment storage tiers from servers for the given table.
+ */
+public class TableTierReader {
+  // Server didn't respond although segment should be hosted there per ideal state.
+  private static final String ERROR_RESP_NO_RESPONSE = "NO_RESPONSE_FROM_SERVER";
+  // The segment is not listed in the server response, although segment should be hosted by the server per ideal state.
+  private static final String ERROR_RESP_MISSING_SEGMENT = "SEGMENT_MISSED_ON_SERVER";
+  // The segment is listed in the server response but it's not immutable segment over there, thus no tier info.
+  private static final String ERROR_RESP_NOT_IMMUTABLE = "NOT_IMMUTABLE_SEGMENT";
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+  private final PinotHelixResourceManager _helixResourceManager;
+
+  public TableTierReader(Executor executor, HttpConnectionManager connectionManager,
+      PinotHelixResourceManager helixResourceManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _helixResourceManager = helixResourceManager;
+  }
+
+  /**
+   * Get the segment storage tiers for the given table. The servers or segments not responding the request are
+   * recorded in the result to be checked by caller.
+   *
+   * @param tableNameWithType table name with type
+   * @param timeoutMs timeout for reading segment tiers from servers
+   * @return details of segment storage tiers for the given table
+   */
+  public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable String segmentName, int timeoutMs)
+      throws InvalidConfigException {
+    Map<String, List<String>> serverToSegmentsMap = new HashMap<>();
+    if (segmentName == null) {
+      serverToSegmentsMap.putAll(_helixResourceManager.getServerToSegmentsMap(tableNameWithType));
+    } else {
+      List<String> segmentInList = Collections.singletonList(segmentName);
+      for (String server : _helixResourceManager.getServers(tableNameWithType, segmentName)) {
+        serverToSegmentsMap.put(server, segmentInList);
+      }
+    }
+    BiMap<String, String> endpoints = _helixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
+    ServerTableTierReader serverTableTierReader = new ServerTableTierReader(_executor, _connectionManager);
+    Map<String, TableTierInfo> serverToTableTierInfoMap =
+        serverTableTierReader.getTableTierInfoFromServers(endpoints, tableNameWithType, segmentName, timeoutMs);
+
+    TableTierDetails tableTierDetails = new TableTierDetails(tableNameWithType);
+    for (Map.Entry<String, List<String>> entry : serverToSegmentsMap.entrySet()) {
+      String server = entry.getKey();
+      List<String> expectedSegmentsOnServer = entry.getValue();
+      TableTierInfo tableTierInfo = serverToTableTierInfoMap.get(server);
+      for (String expectedSegment : expectedSegmentsOnServer) {
+        tableTierDetails._segmentTiers.computeIfAbsent(expectedSegment, (k) -> new HashMap<>()).put(server,
+            (tableTierInfo == null) ? ERROR_RESP_NO_RESPONSE : getSegmentTier(expectedSegment, tableTierInfo));
+      }
+    }
+    return tableTierDetails;
+  }
+
+  private static String getSegmentTier(String expectedSegment, TableTierInfo tableTierInfo) {
+    if (tableTierInfo.getMutableSegments().contains(expectedSegment)) {
+      return ERROR_RESP_NOT_IMMUTABLE;
+    }
+    if (!tableTierInfo.getSegmentTiers().containsKey(expectedSegment)) {
+      return ERROR_RESP_MISSING_SEGMENT;
+    }
+    // The value, i.e. tier, can be null.
+    return tableTierInfo.getSegmentTiers().get(expectedSegment);
+  }
+
+  // This class aggregates the TableTierInfo returned from multi servers.
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class TableTierDetails {
+    private final String _tableName;
+    private final Map<String/*segment*/, Map<String/*server*/, String/*tier or err*/>> _segmentTiers = new HashMap<>();
+
+    TableTierDetails(String tableName) {
+      _tableName = tableName;
+    }
+
+    @JsonPropertyDescription("Name of table to look for segment storage tiers")
+    @JsonProperty("tableName")
+    public String getTableName() {
+      return _tableName;
+    }
+
+    @JsonPropertyDescription("Storage tiers of segments for the given table")
+    @JsonProperty("segmentTiers")
+    public Map<String, Map<String, String>> getSegmentTiers() {
+      return _segmentTiers;
+    }
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableTierReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableTierReaderTest.java
new file mode 100644
index 0000000000..d71f34472a
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableTierReaderTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.TableTierInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableTierReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentMatchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+public class TableTierReaderTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TableTierReaderTest.class);
+  private static final String URI_PATH_TABLE_TIERS = "/tables/";
+  private static final String URI_PATH_SEGMENT_TIERS = "/segments/";
+  private static final int TIMEOUT_MSEC = 3000;
+  private static final int EXTENDED_TIMEOUT_FACTOR = 100;
+
+  private final Executor _executor = Executors.newFixedThreadPool(1);
+  private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
+  private final Map<String, FakeSizeServer> _serverMap = new HashMap<>();
+  private PinotHelixResourceManager _helix;
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    _helix = mock(PinotHelixResourceManager.class);
+
+    int counter = 0;
+
+    // following servers are configured to get table tiers
+    // server0 - all good
+    Map<String, String> segTierMap = new HashMap<>();
+    segTierMap.put("seg01", null);
+    segTierMap.put("seg02", "someTier");
+    Set<String> muSegs = Collections.singleton("muSeg01");
+    FakeSizeServer s = new FakeSizeServer(segTierMap, muSegs);
+    s.start(URI_PATH_TABLE_TIERS, createHandler(200, s._segTierMap, muSegs, 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server1 - all good
+    s = new FakeSizeServer(Collections.singletonMap("seg01", null));
+    s.start(URI_PATH_TABLE_TIERS, createHandler(200, s._segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server2 - always 404
+    s = new FakeSizeServer(Collections.singletonMap("seg02", "someTier"));
+    s.start(URI_PATH_TABLE_TIERS, createHandler(404, s._segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server3 - empty server
+    s = new FakeSizeServer(Collections.emptyMap());
+    s.start(URI_PATH_TABLE_TIERS, createHandler(200, s._segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server4 - missing seg03
+    segTierMap = new HashMap<>();
+    segTierMap.put("seg02", "someTier");
+    segTierMap.put("seg03", "someTier");
+    s = new FakeSizeServer(segTierMap);
+    segTierMap = new HashMap<>(segTierMap);
+    segTierMap.remove("seg03");
+    s.start(URI_PATH_TABLE_TIERS, createHandler(200, segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server5 - timing out server
+    s = new FakeSizeServer(Collections.singletonMap("seg04", "someTier"));
+    s.start(URI_PATH_TABLE_TIERS,
+        createHandler(200, s._segTierMap, Collections.emptySet(), TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // following servers are configured to get segment tiers
+    // server6 - all good for segX
+    segTierMap = new HashMap<>();
+    segTierMap.put("segX", "someTier");
+    muSegs = Collections.singleton("segY");
+    s = new FakeSizeServer(segTierMap, muSegs);
+    s.start(URI_PATH_SEGMENT_TIERS, createHandler(200, s._segTierMap, muSegs, 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server7 - missing segX
+    s = new FakeSizeServer(segTierMap);
+    segTierMap = new HashMap<>(segTierMap);
+    segTierMap.remove("segX");
+    s.start(URI_PATH_SEGMENT_TIERS, createHandler(200, segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server8 - 404
+    s = new FakeSizeServer(Collections.singletonMap("segX", null));
+    s.start(URI_PATH_TABLE_TIERS, createHandler(404, s._segTierMap, Collections.emptySet(), 0));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+
+    // server9 - timing out
+    s = new FakeSizeServer(Collections.singletonMap("segX", null));
+    s.start(URI_PATH_TABLE_TIERS,
+        createHandler(200, s._segTierMap, Collections.emptySet(), TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
+    _serverMap.put(serverName(counter), s);
+    counter++;
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (Map.Entry<String, FakeSizeServer> fakeServerEntry : _serverMap.entrySet()) {
+      fakeServerEntry.getValue().stop();
+    }
+  }
+
+  private HttpHandler createHandler(int status, Map<String, String> segTierMap, Set<String> mutableSegments,
+      int sleepTimeMs) {
+    return httpExchange -> {
+      if (sleepTimeMs > 0) {
+        try {
+          Thread.sleep(sleepTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.info("Handler interrupted during sleep");
+        }
+      }
+      TableTierInfo tableInfo = new TableTierInfo("myTable", segTierMap, mutableSegments);
+      String json = JsonUtils.objectToString(tableInfo);
+      httpExchange.sendResponseHeaders(status, json.length());
+      OutputStream responseBody = httpExchange.getResponseBody();
+      responseBody.write(json.getBytes());
+      responseBody.close();
+    };
+  }
+
+  private String serverName(int index) {
+    return "server" + index;
+  }
+
+  private static class FakeSizeServer extends FakeHttpServer {
+    Set<String> _mutableSegments;
+    Map<String, String> _segTierMap;
+
+    FakeSizeServer(Map<String, String> segTierMap) {
+      this(segTierMap, Collections.emptySet());
+    }
+
+    FakeSizeServer(Map<String, String> segTierMap, Set<String> mutableSegments) {
+      _segTierMap = segTierMap;
+      _mutableSegments = mutableSegments;
+    }
+  }
+
+  private Map<String, List<String>> subsetOfServerSegments(String... servers) {
+    Map<String, List<String>> subset = new HashMap<>();
+    for (String server : servers) {
+      FakeSizeServer fakeSvr = _serverMap.get(server);
+      ArrayList<String> segmentsOnServer = new ArrayList<>(fakeSvr._segTierMap.keySet());
+      segmentsOnServer.addAll(fakeSvr._mutableSegments);
+      subset.put(server, segmentsOnServer);
+    }
+    return subset;
+  }
+
+  private BiMap<String, String> serverEndpoints(String... servers) {
+    BiMap<String, String> endpoints = HashBiMap.create(servers.length);
+    for (String server : servers) {
+      endpoints.put(server, _serverMap.get(server)._endpoint);
+    }
+    return endpoints;
+  }
+
+  private TableTierReader.TableTierDetails testRunner(final String[] servers, String tableName, String segmentName)
+      throws InvalidConfigException {
+    when(_helix.getServerToSegmentsMap(ArgumentMatchers.anyString()))
+        .thenAnswer(invocationOnMock -> subsetOfServerSegments(servers));
+    if (segmentName != null) {
+      when(_helix.getServers(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()))
+          .thenAnswer(invocationOnMock -> new HashSet<>(Arrays.asList(servers)));
+    }
+    when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.<String>anySet()))
+        .thenAnswer(invocationOnMock -> serverEndpoints(servers));
+    TableTierReader reader = new TableTierReader(_executor, _connectionManager, _helix);
+    return reader.getTableTierDetails(tableName, segmentName, TIMEOUT_MSEC);
+  }
+
+  @Test
+  public void testGetTableTierInfoAllSuccess()
+      throws InvalidConfigException {
+    final String[] servers = {"server0", "server1"};
+    TableTierReader.TableTierDetails tableTierDetails = testRunner(servers, "myTable_OFFLINE", null);
+    assertEquals(tableTierDetails.getSegmentTiers().size(), 3);
+    Map<String, String> tiersByServer = tableTierDetails.getSegmentTiers().get("seg01");
+    assertEquals(tiersByServer.size(), 2);
+    assertNull(tiersByServer.get("server0"));
+    assertNull(tiersByServer.get("server1"));
+    tiersByServer = tableTierDetails.getSegmentTiers().get("seg02");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server0"), "someTier");
+    tiersByServer = tableTierDetails.getSegmentTiers().get("muSeg01");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server0"), "NOT_IMMUTABLE_SEGMENT");
+  }
+
+  @Test
+  public void testGetTableTierInfoAllErrors()
+      throws InvalidConfigException {
+    String[] servers = {"server2", "server5"};
+    TableTierReader.TableTierDetails tableTierDetails = testRunner(servers, "myTable_OFFLINE", null);
+    assertEquals(tableTierDetails.getSegmentTiers().size(), 2);
+    Map<String, String> tiersByServer = tableTierDetails.getSegmentTiers().get("seg02");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server2"), "NO_RESPONSE_FROM_SERVER");
+    tiersByServer = tableTierDetails.getSegmentTiers().get("seg04");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server5"), "NO_RESPONSE_FROM_SERVER");
+  }
+
+  @Test
+  public void testGetTableTierInfoFromAllServers()
+      throws InvalidConfigException {
+    final String[] servers = {"server0", "server1", "server2", "server3", "server4", "server5"};
+    TableTierReader.TableTierDetails tableTierDetails = testRunner(servers, "myTable_OFFLINE", null);
+
+    assertEquals(tableTierDetails.getSegmentTiers().size(), 5);
+    Map<String, String> tiersByServer = tableTierDetails.getSegmentTiers().get("seg01");
+    assertEquals(tiersByServer.size(), 2);
+    assertNull(tiersByServer.get("server0"));
+    assertNull(tiersByServer.get("server1"));
+
+    tiersByServer = tableTierDetails.getSegmentTiers().get("seg02");
+    assertEquals(tiersByServer.size(), 3);
+    assertEquals(tiersByServer.get("server0"), "someTier");
+    assertEquals(tiersByServer.get("server2"), "NO_RESPONSE_FROM_SERVER");
+    assertEquals(tiersByServer.get("server4"), "someTier");
+
+    tiersByServer = tableTierDetails.getSegmentTiers().get("seg03");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server4"), "SEGMENT_MISSED_ON_SERVER");
+
+    tiersByServer = tableTierDetails.getSegmentTiers().get("seg04");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server5"), "NO_RESPONSE_FROM_SERVER");
+
+    tiersByServer = tableTierDetails.getSegmentTiers().get("muSeg01");
+    assertEquals(tiersByServer.size(), 1);
+    assertEquals(tiersByServer.get("server0"), "NOT_IMMUTABLE_SEGMENT");
+  }
+
+  @Test
+  public void testGetSegmentTierInfoFromAllServers()
+      throws InvalidConfigException {
+    final String[] servers = {"server6", "server7", "server8", "server9"};
+    TableTierReader.TableTierDetails tableTierDetails = testRunner(servers, "myTable_OFFLINE", "segX");
+    assertEquals(tableTierDetails.getSegmentTiers().size(), 1);
+    Map<String, String> tiersByServer = tableTierDetails.getSegmentTiers().get("segX");
+    assertEquals(tiersByServer.size(), 4);
+    assertEquals(tiersByServer.get("server6"), "someTier");
+    assertEquals(tiersByServer.get("server7"), "SEGMENT_MISSED_ON_SERVER");
+    assertEquals(tiersByServer.get("server8"), "NO_RESPONSE_FROM_SERVER");
+    assertEquals(tiersByServer.get("server9"), "NO_RESPONSE_FROM_SERVER");
+    // Check a mutable segment segY.
+    tableTierDetails = testRunner(new String[]{"server6"}, "myTable_OFFLINE", "segY");
+    assertEquals(tableTierDetails.getSegmentTiers().get("segY").get("server6"), "NOT_IMMUTABLE_SEGMENT");
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableTierResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableTierResource.java
new file mode 100644
index 0000000000..47df55f8d7
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TableTierResource.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.common.restlet.resources.TableTierInfo;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * A server-side API to get the storage tiers of immutable segments of the given table from the server being requested.
+ */
+@Api(tags = "Table", authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class TableTierResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableNameWithType}/tiers")
+  @ApiOperation(value = "Get storage tiers of immutable segments of the given table", notes = "Get storage tiers of "
+      + "immutable segments of the given table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Table not found")
+  })
+  public String getTableTiers(@ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType")
+      String tableNameWithType)
+      throws WebApplicationException {
+    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
+    if (instanceDataManager == null) {
+      throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableNameWithType);
+    if (tableDataManager == null) {
+      throw new WebApplicationException("Table: " + tableNameWithType + " is not found", Response.Status.NOT_FOUND);
+    }
+    Set<String> mutableSegments = new HashSet<>();
+    Map<String, String> segmentTiers = new HashMap<>();
+    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments();
+    try {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        if (segmentDataManager instanceof ImmutableSegmentDataManager) {
+          ImmutableSegment immutableSegment = (ImmutableSegment) segmentDataManager.getSegment();
+          segmentTiers.put(immutableSegment.getSegmentName(), immutableSegment.getTier());
+        } else {
+          mutableSegments.add(segmentDataManager.getSegmentName());
+        }
+      }
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        tableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+    TableTierInfo tableTierInfo = new TableTierInfo(tableDataManager.getTableName(), segmentTiers, mutableSegments);
+    return ResourceUtils.convertToJsonString(tableTierInfo);
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/segments/{tableNameWithType}/{segmentName}/tiers")
+  @ApiOperation(value = "Get storage tiers of the immutable segment of the given table", notes = "Get storage tiers "
+      + "of the immutable segment of the given table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Table or segment not found")
+  })
+  public String getTableSegmentTiers(
+      @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType")
+          String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName)
+      throws WebApplicationException {
+    segmentName = URIUtils.decode(segmentName);
+    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
+    if (instanceDataManager == null) {
+      throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableNameWithType);
+    if (tableDataManager == null) {
+      throw new WebApplicationException(String.format("Table: %s is not found", tableNameWithType),
+          Response.Status.NOT_FOUND);
+    }
+    SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+          String.format("Segment: %s is not found in table: %s", segmentName, tableNameWithType),
+          Response.Status.NOT_FOUND);
+    }
+    Set<String> mutableSegments = new HashSet<>();
+    Map<String, String> segmentTiers = new HashMap<>();
+    try {
+      if (segmentDataManager instanceof ImmutableSegmentDataManager) {
+        ImmutableSegment immutableSegment = (ImmutableSegment) segmentDataManager.getSegment();
+        segmentTiers.put(immutableSegment.getSegmentName(), immutableSegment.getTier());
+      } else {
+        mutableSegments.add(segmentDataManager.getSegmentName());
+      }
+    } finally {
+      tableDataManager.releaseSegment(segmentDataManager);
+    }
+    TableTierInfo tableTierInfo = new TableTierInfo(tableDataManager.getTableName(), segmentTiers, mutableSegments);
+    return ResourceUtils.convertToJsonString(tableTierInfo);
+  }
+}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TableTierResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TableTierResourceTest.java
new file mode 100644
index 0000000000..e6d53587eb
--- /dev/null
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TableTierResourceTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api;
+
+import java.util.Collections;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.restlet.resources.TableTierInfo;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class TableTierResourceTest extends BaseResourceTest {
+  @Test
+  public void testTableNotFound() {
+    Response response = _webTarget.path("tables/unknownTable/tiers").request().get(Response.class);
+    assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+  }
+
+  @Test
+  public void testSegmentNotFound() {
+    String tableName = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
+    Response response =
+        _webTarget.path(String.format("segments/%s/unknownSegment/tiers", tableName)).request().get(Response.class);
+    assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+  }
+
+  @Test
+  public void testTableTierInfo() {
+    String tableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+    String requestPath = "/tables/" + tableName + "/tiers";
+    verifyTableTierInfo(requestPath, tableName, _realtimeIndexSegments.get(0));
+
+    tableName = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
+    requestPath = "/tables/" + tableName + "/tiers";
+    verifyTableTierInfo(requestPath, tableName, _offlineIndexSegments.get(0));
+  }
+
+  @Test
+  public void testTableSegmentTierInfo() {
+    String tableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+    ImmutableSegment segment = _realtimeIndexSegments.get(0);
+    String requestPath = "/segments/" + tableName + "/" + segment.getSegmentName() + "/tiers";
+    verifyTableTierInfo(requestPath, tableName, segment);
+
+    tableName = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
+    segment = _offlineIndexSegments.get(0);
+    requestPath = "/segments/" + tableName + "/" + segment.getSegmentName() + "/tiers";
+    verifyTableTierInfo(requestPath, tableName, segment);
+  }
+
+  private void verifyTableTierInfo(String requestPath, String expectedTableName, ImmutableSegment segment) {
+    TableTierInfo tableTierInfo = _webTarget.path(requestPath).request().get(TableTierInfo.class);
+    assertEquals(tableTierInfo.getTableName(), expectedTableName);
+    assertEquals(tableTierInfo.getSegmentTiers().size(), 1);
+    assertEquals(tableTierInfo.getSegmentTiers(),
+        Collections.singletonMap(segment.getSegmentName(), segment.getTier()));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org