You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/04/26 02:15:24 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #12404: Optimize overlord GET /tasks memory usage

imply-cheddar commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r858145172


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java:
##########
@@ -170,6 +172,25 @@ public List<Task> getActiveTasksByDatasource(String datasource)
     return listBuilder.build();
   }
 
+  private TaskStatusPlus toTaskStatusPlus(TaskInfo<Task, TaskStatus> taskInfo)

Review Comment:
   Why not make this a static method on `TaskStatusPlus`?  Is it really important for it to be private to this class?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -745,87 +745,57 @@ private List<TaskStatusPlus> getTasks(
 
     if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
       // We are interested in only those tasks which are in taskRunner.
-      taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
-          .filter(info -> runnerWorkItems.containsKey(info.getId()));
+      taskStatusPlusStream = taskStatusPlusStream
+          .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId()));
     }
-    final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage
-        .collect(Collectors.toList());
+    final List<TaskStatusPlus> taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList());
 
     // Separate complete and active tasks from taskStorage.
     // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType.
-    final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage = new ArrayList<>();
-    final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new ArrayList<>();
-    for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
-      if (info.getStatus().isComplete()) {
-        completeTaskInfoFromTaskStorage.add(info);
+    final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
+    final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
+    for (TaskStatusPlus statusPlus : taskStatusPlusList) {
+      if (statusPlus.getStatusCode().isComplete()) {
+        completeTaskStatusPlusList.add(statusPlus);
       } else {
-        activeTaskInfoFromTaskStorage.add(info);
+        activeTaskStatusPlusList.add(statusPlus);
       }
     }
 
-    final List<TaskStatusPlus> statuses = new ArrayList<>();
-    completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
-        new TaskStatusPlus(
-            taskInfo.getId(),
-            taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
-            taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
-            taskInfo.getCreatedTime(),
-            DateTimes.EPOCH,
-            taskInfo.getStatus().getStatusCode(),
-            RunnerTaskState.NONE,
-            taskInfo.getStatus().getDuration(),
-            taskInfo.getStatus().getLocation(),
-            taskInfo.getDataSource(),
-            taskInfo.getStatus().getErrorMsg()
-        )
-    ));
+    final List<TaskStatusPlus> taskStatuses = new ArrayList<>(completeTaskStatusPlusList);
 
-    activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
-      final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId());
+    activeTaskStatusPlusList.forEach(statusPlus -> {
+      final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId());
       if (runnerWorkItem == null) {
         // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner.
         if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
-          statuses.add(
-              new TaskStatusPlus(
-                  taskInfo.getId(),
-                  taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
-                  taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
-                  taskInfo.getCreatedTime(),
-                  DateTimes.EPOCH,
-                  taskInfo.getStatus().getStatusCode(),
-                  RunnerTaskState.WAITING,
-                  taskInfo.getStatus().getDuration(),
-                  taskInfo.getStatus().getLocation(),
-                  taskInfo.getDataSource(),
-                  taskInfo.getStatus().getErrorMsg()
-              )
-          );
+          taskStatuses.add(statusPlus);
         }
       } else {
         if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
-          statuses.add(
+          taskStatuses.add(
               new TaskStatusPlus(

Review Comment:
   If it's already a TaskStatusPlus, why do you need to do all of this?  It looks like this is setting createdTime and insertion time, are those not already set?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {

Review Comment:
   Why not add both of the columns at the same time?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   `new ObjectMapper()` should never exist in the Druid code.  It's almost always incorrect and if you find yourself needing to do it because you have no other way of getting an ObjectMapper, the code structure is likely to blame.
   
   In this case, the fact that the `MetadataConnector` is dealing with these things is to blame.  `SQLMetadataStorageActionHandler` already has an object mapper that it uses.  The logic for this migration should actually exist inside of that class or `MetadataTaskStorage` and *not* in here.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +322,104 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskStatusPlus> getTaskStatusPlusList(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload

Review Comment:
   This boolean here about fetching payload is weird, especially on a public method.  This class should already know if it shoudl be using the payload or not.  Once you move the mgiration logic into this class, you won't need this anymore.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -1782,6 +1783,25 @@ private TaskInfo<Task, TaskStatus> createTaskInfo(
     );
   }
 
+  private TaskStatusPlus toTaskStatusPlus(TaskInfo<Task, TaskStatus> taskInfo)

Review Comment:
   This method shows up again?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java:
##########
@@ -233,6 +254,35 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
     return tasks;
   }
 
+  @Override
+  public List<TaskStatusPlus> getTaskStatusPlusList(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String datasource
+  )
+  {
+    final List<TaskStatusPlus> tasks = new ArrayList<>();
+    taskLookups.forEach((type, lookup) -> {
+      if (type == TaskLookupType.COMPLETE) {
+        CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) lookup;
+        tasks.addAll(
+            getRecentlyCreatedAlreadyFinishedTaskInfo(
+                completeTaskLookup.hasTaskCreatedTimeFilter()
+                ? completeTaskLookup
+                : completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()),
+                datasource
+            ).stream()
+             .map(taskInfo -> toTaskStatusPlus(taskInfo))

Review Comment:
   nit: might as well just pass `toTaskStatusPlus` it can be a lambda on its own.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));

Review Comment:
   We likely want to explicitly default the column to null.  This tends to avoid locks on DDL tables, where if there is a default value (or if the default that the ALTER table uses is non null), then the ALTER TABLE can end up locking the table and that can end up causing other sadness.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(

Review Comment:
   Why retry?  Why do we expect it to be normal for this to fail once and then succeed subsequently?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()

Review Comment:
   I'm not sure this really belongs in the `MetadataConnector`...  `MetadataTaskStorage` would likely be a more natural place to actually have the migration logic as that is also the place that actually understands what to do with the tables.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();
+              Connection connection = handle.getConnection();
+              Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
+              boolean flag = true;
+              while (flag) {
+                // Should ideally use a cursor and sort by id for efficiency, but updates with ordering aren't allowed
+                String sql = StringUtils.format(
+                    "SELECT * FROM %1$s WHERE active = false AND type IS null %2$s",
+                    tableName,
+                    limitClause(100)
+                );
+                ResultSet resultSet = statement.executeQuery(sql);
+                flag = false;
+                while (resultSet.next()) {
+                  ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+                  resultSet.updateString("type", payload.get("type").asText());
+                  resultSet.updateString("group_id", payload.get("groupId").asText());
+                  resultSet.updateRow();

Review Comment:
   Fwiw, you probably don't even need to set a "limit" clause on the query anymore if you follow what I suggested above.  Basically, if you just tell JDBI to only give you the first 100 things from the ResultSet, it should be able to just return them and then when you close the cursor, the DB will close things and be happy-ish.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();
+              Connection connection = handle.getConnection();
+              Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
+              boolean flag = true;
+              while (flag) {
+                // Should ideally use a cursor and sort by id for efficiency, but updates with ordering aren't allowed
+                String sql = StringUtils.format(
+                    "SELECT * FROM %1$s WHERE active = false AND type IS null %2$s",
+                    tableName,
+                    limitClause(100)
+                );
+                ResultSet resultSet = statement.executeQuery(sql);
+                flag = false;
+                while (resultSet.next()) {
+                  ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+                  resultSet.updateString("type", payload.get("type").asText());
+                  resultSet.updateString("group_id", payload.get("groupId").asText());
+                  resultSet.updateRow();

Review Comment:
   Doing updates like this in the same transaction is overkill and might get into weird locking behaviors on the table.  It's best not to do it.  Each update should be effectively 2 queries: 1 to get the task payloads, that would exit and return all DB resources and then a second one to actually issue an UPDATE command to set the extra fields based on the taskId.
   
   When the first query stops returning results, that's when you know the migration is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org