You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/11/20 21:42:52 UTC

[incubator-druid] branch master updated: Added support for filtering by unused parameter for HeapMemoryTaskStorage (#6510)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 81c9a61  Added support for filtering by unused parameter for HeapMemoryTaskStorage (#6510)
81c9a61 is described below

commit 81c9a6177cc6958942eaa9ea294fe96c4a3708f4
Author: Marat <sa...@gmail.com>
AuthorDate: Wed Nov 21 00:42:44 2018 +0300

    Added support for filtering by unused parameter for HeapMemoryTaskStorage (#6510)
    
    * 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method:
     HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time
    2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method.
    3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter.
    
    * 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method:
     HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time
    2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method.
    3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter.
    
    * Fixed OverlordResourceTest to Support changed methods names
    
    * Changed methods and parameters names to make them more obvious to understand.
    
    * Changed String.replace() for the StringUtils.replace()(#6607)
    
    * Fixed checkstyle error
---
 .../druid/indexing/overlord/HeapMemoryTaskStorage.java  | 17 +++++++++--------
 .../druid/indexing/overlord/MetadataTaskStorage.java    |  6 +++---
 .../org/apache/druid/indexing/overlord/TaskStorage.java |  6 +++---
 .../indexing/overlord/TaskStorageQueryAdapter.java      |  4 ++--
 .../druid/indexing/overlord/http/OverlordResource.java  | 12 ++++++------
 .../indexing/overlord/http/OverlordResourceTest.java    | 14 +++++++-------
 6 files changed, 30 insertions(+), 29 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 006286a..51b38d0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -221,9 +221,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
   }
 
   @Override
-  public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses,
-      @Nullable Duration duration,
+      @Nullable Duration durationBeforeNow,
       @Nullable String datasource
   )
   {
@@ -240,18 +240,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
       }.reverse();
 
       return maxTaskStatuses == null ?
-             getRecentlyFinishedTaskInfoSince(
-                 DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration),
+             getRecentlyCreatedAlreadyFinishedTaskInfoSince(
+                 DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
                  createdDateDesc
              ) :
-             getNRecentlyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
+             getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
     }
     finally {
       giant.unlock();
     }
   }
 
-  private List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfoSince(
+  private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
       DateTime start,
       Ordering<TaskStuff> createdDateDesc
   )
@@ -262,7 +262,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
       List<TaskStuff> list = createdDateDesc
           .sortedCopy(tasks.values())
           .stream()
-          .filter(taskStuff -> taskStuff.getStatus().isComplete())
+          .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start))
           .collect(Collectors.toList());
       final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
       for (final TaskStuff taskStuff : list) {
@@ -283,7 +283,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
     }
   }
 
-  private List<TaskInfo<Task, TaskStatus>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
+  private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
   {
     giant.lock();
 
@@ -291,6 +291,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
       List<TaskStuff> list = createdDateDesc
           .sortedCopy(tasks.values())
           .stream()
+          .filter(taskStuff -> taskStuff.getStatus().isComplete())
           .limit(n)
           .collect(Collectors.toList());
       final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 808fdb7..f0d9d37 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -208,15 +208,15 @@ public class MetadataTaskStorage implements TaskStorage
   }
 
   @Override
-  public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses,
-      @Nullable Duration duration,
+      @Nullable Duration durationBeforeNow,
       @Nullable String datasource
   )
   {
     return ImmutableList.copyOf(
         handler.getCompletedTaskInfo(
-            DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration),
+            DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
             maxTaskStatuses,
             datasource
         )
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
index b2f55f0..1edd52c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
@@ -141,14 +141,14 @@ public interface TaskStorage
    * return nothing.
    *
    * @param maxTaskStatuses maxTaskStatuses
-   * @param duration        duration
+   * @param durationBeforeNow duration
    * @param datasource      datasource
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
+  List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses,
-      @Nullable Duration duration,
+      @Nullable Duration durationBeforeNow,
       @Nullable String datasource
   );
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
index fd61752..6a12b40 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -58,13 +58,13 @@ public class TaskStorageQueryAdapter
     return storage.getActiveTaskInfo(dataSource);
   }
 
-  public List<TaskInfo<Task, TaskStatus>> getRecentlyCompletedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration(
       @Nullable Integer maxTaskStatuses,
       @Nullable Duration duration,
       @Nullable String dataSource
   )
   {
-    return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
+    return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
   }
 
   public Optional<Task> getTask(final String taskid)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index f0748f9..92f01ac 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -627,7 +627,7 @@ public class OverlordResource
   public Response getTasks(
       @QueryParam("state") final String state,
       @QueryParam("datasource") final String dataSource,
-      @QueryParam("interval") final String interval,
+      @PathParam("createdTimeInterval") final String createdTimeInterval,
       @QueryParam("max") final Integer maxCompletedTasks,
       @QueryParam("type") final String type,
       @Context final HttpServletRequest req
@@ -692,13 +692,13 @@ public class OverlordResource
 
     //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks
     if (state == null || "complete".equals(StringUtils.toLowerCase(state))) {
-      Duration duration = null;
-      if (interval != null) {
-        final Interval theInterval = Intervals.of(interval.replace('_', '/'));
-        duration = theInterval.toDuration();
+      Duration createdTimeDuration = null;
+      if (createdTimeInterval != null) {
+        final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
+        createdTimeDuration = theInterval.toDuration();
       }
       final List<TaskInfo<Task, TaskStatus>> taskInfoList =
-          taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource);
+          taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource);
       final List<TaskStatusPlus> completedTasks = taskInfoList.stream()
                                                               .map(completeTaskTransformFunc::apply)
                                                               .collect(Collectors.toList());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 7964c76..e3885f7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -233,7 +233,7 @@ public class OverlordResourceTest
             new MockTaskRunnerWorkItem(tasksIds.get(1), null),
             new MockTaskRunnerWorkItem(tasksIds.get(2), null)));
 
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -259,7 +259,7 @@ public class OverlordResourceTest
         )
     );
     EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
-    Assert.assertTrue(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null).size() == 3);
+    Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3);
     Assert.assertTrue(taskRunner.getRunningTasks().size() == 3);
     List<TaskStatusPlus> responseObjects = (List) overlordResource
           .getCompleteTasks(null, req).getEntity();
@@ -313,7 +313,7 @@ public class OverlordResourceTest
   {
     expectAuthorizationTokenCheck();
     //completed tasks
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_5",
@@ -403,7 +403,7 @@ public class OverlordResourceTest
   {
     expectAuthorizationTokenCheck();
     //completed tasks
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, "allow")).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_5",
@@ -667,7 +667,7 @@ public class OverlordResourceTest
   public void testGetTasksFilterCompleteState()
   {
     expectAuthorizationTokenCheck();
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -707,7 +707,7 @@ public class OverlordResourceTest
     expectAuthorizationTokenCheck();
     List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
     Duration duration = new Period("PT86400S").toStandardDuration();
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, duration, null)).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -747,7 +747,7 @@ public class OverlordResourceTest
   public void testGetNullCompleteTask()
   {
     expectAuthorizationTokenCheck();
-    EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org