You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/22 03:50:33 UTC

[GitHub] QiuMM closed pull request #6592: tasks tables in metadata storage are not cleared

QiuMM closed pull request #6592: tasks tables in metadata storage are not cleared
URL: https://github.com/apache/incubator-druid/pull/6592
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
index 1614464d76a..d25829cd0bc 100644
--- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
+++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
@@ -130,6 +130,13 @@ void insert(
    */
   void removeLock(long lockId);
 
+  /**
+   * Remove the tasks created older than the given timestamp.
+   * 
+   * @param timestamp timestamp in milliseconds
+   */
+  void removeTasksOlderThan(long timestamp);
+
   /**
    * Add a log to the entry with the given id.
    *
diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index f30c555eb44..eceab634443 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -525,13 +525,13 @@ If you are running the indexing service in remote mode, the task logs must be st
 |--------|-----------|-------|
 |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
 
-You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties.
+You can also configure the Overlord to automatically retain the task logs in log directory and entries in task-related metadata storage tables only for last x milliseconds by configuring following additional properties.
 Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior.  
 
 |Property|Description|Default|
 |--------|-----------|-------|
-|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false|
-|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None|
+|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit kill tasks periodically based on `druid.indexer.logs.kill.delay` specified, which will delete task logs from the log directory as well as tasks and tasklogs table entries in metadata storage except for tasks created in the last `druid.indexer.logs.kill.durationToRetain` period. |false|
+|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs and entries in task-related metadata storage tables to be retained created in last x milliseconds. |None|
 |`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)|
 |`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)|
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 006286ac841..0480e179783 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -362,6 +362,26 @@ public void removeLock(final String taskid, final TaskLock taskLock)
     }
   }
 
+  @Override
+  public void removeTasksOlderThan(final long timestamp)
+  {
+    giant.lock();
+
+    try {
+      List<String> taskIds = tasks.entrySet().stream()
+                                  .filter(entry -> entry.getValue().getStatus().isComplete()
+                                                   && entry.getValue().getCreatedDate().isBefore(timestamp))
+                                  .map(entry -> entry.getKey())
+                                  .collect(Collectors.toList());
+
+      taskIds.forEach(taskActions::removeAll);
+      taskIds.forEach(tasks::remove);
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
   @Override
   public List<TaskLock> getLocks(final String taskid)
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 808fdb79729..a93e385edd2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -276,6 +276,12 @@ public void removeLock(String taskid, TaskLock taskLockToRemove)
     }
   }
 
+  @Override
+  public void removeTasksOlderThan(long timestamp)
+  {
+    handler.removeTasksOlderThan(timestamp);
+  }
+
   @Override
   public List<TaskLock> getLocks(String taskid)
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
index b2f55f0c9d4..a880085ce30 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
@@ -75,6 +75,13 @@
    */
   void removeLock(String taskid, TaskLock taskLock);
 
+  /**
+   * Remove the tasks created older than the given timestamp.
+   *
+   * @param timestamp timestamp in milliseconds
+   */
+  void removeTasksOlderThan(long timestamp);
+
   /**
    * Returns task as stored in the storage facility. If the task ID does not exist, this will return an
    * absentee Optional.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java
index 18da374fae7..4ab54404925 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord.helpers;
 
 import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.tasklogs.TaskLogKiller;
@@ -35,15 +36,18 @@
 
   private final TaskLogKiller taskLogKiller;
   private final TaskLogAutoCleanerConfig config;
+  private final TaskStorage taskStorage;
 
   @Inject
   public TaskLogAutoCleaner(
       TaskLogKiller taskLogKiller,
-      TaskLogAutoCleanerConfig config
+      TaskLogAutoCleanerConfig config,
+      TaskStorage taskStorage
   )
   {
     this.taskLogKiller = taskLogKiller;
     this.config = config;
+    this.taskStorage = taskStorage;
   }
 
   @Override
@@ -67,7 +71,9 @@ public void schedule(ScheduledExecutorService exec)
           public void run()
           {
             try {
-              taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain());
+              long timestamp = System.currentTimeMillis() - config.getDurationToRetain();
+              taskLogKiller.killOlderThan(timestamp);
+              taskStorage.removeTasksOlderThan(timestamp);
             }
             catch (Exception ex) {
               log.error(ex, "Failed to clean-up the task logs");
diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java
index 039a6427f02..bff95785f8f 100644
--- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -91,4 +91,13 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data
     }
     return sql;
   }
+
+  @Deprecated
+  @Override
+  public String getSqlRemoveLogsOlderThan()
+  {
+    return StringUtils.format("DELETE FROM %s WHERE %s_id in ("
+                              + " SELECT id FROM %s WHERE created_date < :date_time and active = false)",
+                              getLogTable(), getEntryTypeName(), getEntryTable());
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
index 2f902365e74..e967fb8f09e 100644
--- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
@@ -89,4 +89,14 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data
     }
     return sql;
   }
+
+  @Deprecated
+  @Override
+  public String getSqlRemoveLogsOlderThan()
+  {
+    return StringUtils.format("DELETE FROM %s USING %s "
+                              + "WHERE %s_id = %s.id AND created_date < :date_time and active = false",
+                              getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable());
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index ebee2c192ff..da8bd61b9ee 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -113,6 +113,16 @@ protected String getEntryTable()
     return entryTable;
   }
 
+  protected String getLogTable()
+  {
+    return logTable;
+  }
+
+  protected String getEntryTypeName()
+  {
+    return entryTypeName;
+  }
+
   public TypeReference getEntryType()
   {
     return entryType;
@@ -439,6 +449,29 @@ public Void withHandle(Handle handle)
     );
   }
 
+  @Override
+  public void removeTasksOlderThan(final long timestamp)
+  {
+    DateTime dateTime = DateTimes.utc(timestamp);
+    connector.retryWithHandle(
+        (HandleCallback<Void>) handle -> {
+          handle.createStatement(getSqlRemoveLogsOlderThan())
+                .bind("date_time", dateTime.toString())
+                .execute();
+          handle.createStatement(
+              StringUtils.format(
+                  "DELETE FROM %s WHERE created_date < :date_time AND active = false",
+                  entryTable
+              )
+          )
+                .bind("date_time", dateTime.toString())
+                .execute();
+
+          return null;
+        }
+    );
+  }
+
   private int removeLock(Handle handle, long lockId)
   {
     return handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable))
@@ -508,6 +541,15 @@ public Boolean withHandle(Handle handle) throws Exception
     );
   }
 
+  @Deprecated
+  public String getSqlRemoveLogsOlderThan()
+  {
+    return StringUtils.format("DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+                              + "WHERE b.created_date < :date_time and b.active = false",
+                              logTable, entryTable, entryTypeName
+    );
+  }
+
   @Override
   public Map<Long, LockType> getLocks(final String entryId)
   {
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
index 46dc7f49049..c34ff93ebbc 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
@@ -381,4 +381,65 @@ public void testGetLockId() throws EntryExistsException
     Assert.assertNotNull(handler.getLockId(entryId, lock1));
     Assert.assertNull(handler.getLockId(entryId, lock2));
   }
+
+  @Test
+  public void testRemoveTasksOlderThan() throws Exception
+  {
+    final String entryId1 = "1234";
+    Map<String, Integer> entry1 = ImmutableMap.of("numericId", 1234);
+    Map<String, Integer> status1 = ImmutableMap.of("count", 42, "temp", 1);
+    handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1);
+    Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created")));
+
+    final String entryId2 = "ABC123";
+    Map<String, Integer> entry2 = ImmutableMap.of("a", 1);
+    Map<String, Integer> status2 = ImmutableMap.of("count", 42);
+    handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2);
+    Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created")));
+
+    final String entryId3 = "DEF5678";
+    Map<String, Integer> entry3 = ImmutableMap.of("numericId", 5678);
+    Map<String, Integer> status3 = ImmutableMap.of("count", 21, "temp", 2);
+    handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3);
+    Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created")));
+
+    Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1));
+    Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2));
+    Assert.assertEquals(Optional.of(entry3), handler.getEntry(entryId3));
+
+    Assert.assertEquals(
+        ImmutableList.of(entryId2),
+        handler.getActiveTaskInfo(null).stream()
+               .map(taskInfo -> taskInfo.getId())
+               .collect(Collectors.toList())
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(entryId3, entryId1),
+        handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream()
+               .map(taskInfo -> taskInfo.getId())
+               .collect(Collectors.toList())
+
+    );
+
+    handler.removeTasksOlderThan(DateTimes.of("2014-01-02").getMillis());
+    // active task not removed.
+    Assert.assertEquals(
+        ImmutableList.of(entryId2),
+        handler.getActiveTaskInfo(null).stream()
+               .map(taskInfo -> taskInfo.getId())
+               .collect(Collectors.toList())
+    );
+    Assert.assertEquals(
+        ImmutableList.of(entryId3),
+        handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream()
+               .map(taskInfo -> taskInfo.getId())
+               .collect(Collectors.toList())
+
+    );
+    // tasklogs
+    Assert.assertEquals(0, handler.getLogs(entryId1).size());
+    Assert.assertEquals(1, handler.getLogs(entryId2).size());
+    Assert.assertEquals(1, handler.getLogs(entryId3).size());
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org