You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/07/14 18:11:35 UTC

[pinot] branch master updated: List tables by taskType (#9049)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a2cde564f6 List tables by taskType (#9049)
a2cde564f6 is described below

commit a2cde564f6821350854a1e78935354fed812d3b4
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Jul 14 11:11:30 2022 -0700

    List tables by taskType (#9049)
    
    * List tables by taskType
    
    * Fix test states
    
    * Regression check older java
---
 .../api/resources/PinotTableRestletResource.java   |  57 +++++++----
 .../api/PinotTableRestletResourceTest.java         | 112 +++++++++++++++++++++
 2 files changed, 148 insertions(+), 21 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index dfd3a11ac7..b8239aa532 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -36,11 +36,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
@@ -60,6 +63,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
@@ -243,45 +247,55 @@ public class PinotTableRestletResource {
   @Path("/tables")
   @ApiOperation(value = "Lists all tables in cluster", notes = "Lists all tables in cluster")
   public String listTables(@ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
       @ApiParam(value = "name|creationTime|lastModifiedTime") @QueryParam("sortType") String sortTypeStr,
       @ApiParam(value = "true|false") @QueryParam("sortAsc") @DefaultValue("true") boolean sortAsc) {
     try {
-      List<String> tableNames;
       TableType tableType = null;
       if (tableTypeStr != null) {
         tableType = TableType.valueOf(tableTypeStr.toUpperCase());
       }
       SortType sortType = sortTypeStr != null ? SortType.valueOf(sortTypeStr.toUpperCase()) : SortType.NAME;
 
-      if (tableType == null) {
-        if (sortType == SortType.NAME) {
-          tableNames = _pinotHelixResourceManager.getAllRawTables();
-        } else {
-          // NOTE: Need to read actual table names (with type suffix) when not sorting on name because we need to read
-          //       the stats for the ZK records
-          tableNames = _pinotHelixResourceManager.getAllTables();
-        }
-      } else {
-        if (tableType == TableType.REALTIME) {
-          tableNames = _pinotHelixResourceManager.getAllRealtimeTables();
-        } else {
-          tableNames = _pinotHelixResourceManager.getAllOfflineTables();
+      List<String> tableNamesWithType = tableType == null ? _pinotHelixResourceManager.getAllTables()
+          : (tableType == TableType.REALTIME ? _pinotHelixResourceManager.getAllRealtimeTables()
+              : _pinotHelixResourceManager.getAllOfflineTables());
+
+      if (StringUtils.isNotBlank(taskType)) {
+        Set<String> tableNamesForTaskType = new HashSet<>();
+        for (String tableNameWithType : tableNamesWithType) {
+          TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+          if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
+              .isTaskTypeEnabled(taskType)) {
+            tableNamesForTaskType.add(tableNameWithType);
+          }
         }
+        tableNamesWithType.retainAll(tableNamesForTaskType);
       }
 
+      List<String> tableNames;
       if (sortType == SortType.NAME) {
-        tableNames.sort(sortAsc ? null : Comparator.reverseOrder());
+        if (tableType == null && StringUtils.isBlank(taskType)) {
+          List<String> rawTableNames = tableNamesWithType.stream().map(TableNameBuilder::extractRawTableName).distinct()
+              .collect(Collectors.toList());
+          rawTableNames.sort(sortAsc ? null : Comparator.reverseOrder());
+          tableNames = rawTableNames;
+        } else {
+          tableNamesWithType.sort(sortAsc ? null : Comparator.reverseOrder());
+          tableNames = tableNamesWithType;
+        }
       } else {
         int sortFactor = sortAsc ? 1 : -1;
         ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
-        int numTables = tableNames.size();
+        int numTables = tableNamesWithType.size();
         List<String> zkPaths = new ArrayList<>(numTables);
-        for (String tableNameWithType : tableNames) {
+        for (String tableNameWithType : tableNamesWithType) {
           zkPaths.add(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType));
         }
         Stat[] stats = propertyStore.getStats(zkPaths, AccessOption.PERSISTENT);
         for (int i = 0; i < numTables; i++) {
-          Preconditions.checkState(stats[i] != null, "Failed to read ZK stats for table: %s", tableNames.get(i));
+          Preconditions.checkState(stats[i] != null, "Failed to read ZK stats for table: %s",
+              tableNamesWithType.get(i));
         }
         IntComparator comparator;
         if (sortType == SortType.CREATIONTIME) {
@@ -295,11 +309,12 @@ public class PinotTableRestletResource {
           stats[i] = stats[j];
           stats[j] = tempStat;
 
-          String tempTableName = tableNames.get(i);
-          tableNames.set(i, tableNames.get(j));
-          tableNames.set(j, tempTableName);
+          String tempTableName = tableNamesWithType.get(i);
+          tableNamesWithType.set(i, tableNamesWithType.get(j));
+          tableNamesWithType.set(j, tempTableName);
         };
         Arrays.quickSort(0, numTables, comparator, swapper);
+       tableNames = tableNamesWithType;
       }
 
       return JsonUtils.newObjectNode().set("tables", JsonUtils.objectToJsonNode(tableNames)).toString();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 5f98e864a6..5345c935c1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -18,12 +18,16 @@
  */
 package org.apache.pinot.controller.api;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.controller.api.resources.TableAndSchemaConfig;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -354,6 +358,114 @@ public class PinotTableRestletResourceTest {
     }
   }
 
+  private void deleteAllTables()
+      throws IOException {
+    List<String> tables = getTableNames(_createTableUrl + "?type=offline");
+    tables.addAll(getTableNames(_createTableUrl + "?type=realtime"));
+    for (String tableName : tables) {
+      ControllerTest.sendDeleteRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+    }
+  }
+
+  @Test
+  public void testListTables()
+      throws Exception {
+    deleteAllTables();
+    List<String> tables = getTableNames(_createTableUrl);
+    Assert.assertTrue(tables.isEmpty());
+
+    // post 2 offline, 1 realtime
+    String rawTableName1 = "pqr";
+    TableConfig offlineTableConfig1 = _offlineBuilder.setTableName(rawTableName1).build();
+    ControllerTest.sendPostRequest(_createTableUrl, offlineTableConfig1.toJsonString());
+    TEST_INSTANCE.addDummySchema(rawTableName1);
+    TableConfig realtimeTableConfig1 = _realtimeBuilder.setTableName(rawTableName1).setNumReplicas(2).build();
+    ControllerTest.sendPostRequest(_createTableUrl, realtimeTableConfig1.toJsonString());
+    String rawTableName2 = "abc";
+    TableConfig offlineTableConfig2 = _offlineBuilder.setTableName(rawTableName2).build();
+    ControllerTest.sendPostRequest(_createTableUrl, offlineTableConfig2.toJsonString());
+
+    // list
+    tables = getTableNames(_createTableUrl);
+    Assert.assertEquals(tables, Lists.newArrayList("abc", "pqr"));
+    tables = getTableNames(_createTableUrl + "?sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr", "abc"));
+    tables = getTableNames(_createTableUrl + "?sortType=creationTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "pqr_REALTIME", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?sortType=creationTime&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_REALTIME", "pqr_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "pqr_REALTIME", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_REALTIME", "pqr_OFFLINE"));
+
+    // type
+    tables = getTableNames(_createTableUrl + "?type=realtime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME"));
+    tables = getTableNames(_createTableUrl + "?type=offline");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?type=offline&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?type=offline&sortType=creationTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?type=offline&sortType=creationTime&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?type=offline&sortType=lastModifiedTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?type=offline&sortType=lastModifiedTime&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_OFFLINE"));
+
+    // update taskType for abc_OFFLINE
+    Map<String, Map<String, String>> taskTypeMap = new HashMap<>();
+    taskTypeMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>());
+    offlineTableConfig2.setTaskConfig(new TableTaskConfig(taskTypeMap));
+    JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+        TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName2),
+        offlineTableConfig2.toJsonString()));
+    // update for pqr_REALTIME
+    taskTypeMap = new HashMap<>();
+    taskTypeMap.put(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    realtimeTableConfig1.setTaskConfig(new TableTaskConfig(taskTypeMap));
+    JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+        TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName1),
+        realtimeTableConfig1.toJsonString()));
+
+    // list lastModified, taskType
+    tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE", "pqr_REALTIME"));
+    tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME", "abc_OFFLINE", "pqr_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask&type=realtime");
+    Assert.assertTrue(tables.isEmpty());
+    tables = getTableNames(_createTableUrl + "?taskType=RealtimeToOfflineSegmentsTask");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME"));
+
+    // update taskType for pqr_OFFLINE
+    taskTypeMap = new HashMap<>();
+    taskTypeMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>());
+    offlineTableConfig1.setTaskConfig(new TableTaskConfig(taskTypeMap));
+    JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+        TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName1),
+        offlineTableConfig1.toJsonString()));
+
+    // list lastModified, taskType
+    tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask");
+    Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE", "pqr_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask&sortAsc=false");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE"));
+    tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask&sortType=creationTime");
+    Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE", "abc_OFFLINE"));
+  }
+
+  private List<String> getTableNames(String url)
+      throws IOException {
+    JsonNode tablesJson = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(url)).get("tables");
+    return JsonUtils.jsonNodeToObject(tablesJson, new TypeReference<List<String>>() {
+    });
+  }
+
   @Test(expectedExceptions = IOException.class)
   public void rebalanceNonExistentTable()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org