You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/19 23:33:48 UTC

[GitHub] jon-wei closed pull request #5998: Add support to filter on datasource for active tasks

jon-wei closed pull request #5998: Add support to filter on datasource for active tasks
URL: https://github.com/apache/incubator-druid/pull/5998
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
index 892bf761716..dca2dd006b9 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
@@ -120,7 +120,7 @@ void insert(
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<EntryType>> getActiveTaskInfo();
+  List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource);
 
   /**
    * Return createdDate and dataSource for the given id
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 2e91c3f6026..341cf8ae41d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -170,7 +170,7 @@ public void setStatus(TaskStatus status)
   }
 
   @Override
-  public List<TaskInfo<Task>> getActiveTaskInfo()
+  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
   {
     giant.lock();
 
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
index e8489cffdb1..b927497b022 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
@@ -214,10 +214,10 @@ public Task apply(@Nullable Pair<Task, TaskStatus> input)
   }
 
   @Override
-  public List<TaskInfo<Task>> getActiveTaskInfo()
+  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return ImmutableList.copyOf(
-        handler.getActiveTaskInfo()
+        handler.getActiveTaskInfo(dataSource)
     );
   }
 
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
index da7a07ab76f..b24dd35a123 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
@@ -127,9 +127,11 @@
    * Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No particular order
    * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
    *
+   * @param datasource datasource
+   *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<Task>> getActiveTaskInfo();
+  List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource);
 
   /**
    * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No
@@ -137,7 +139,8 @@
    * No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
    *
    * @param maxTaskStatuses maxTaskStatuses
-   * @param duration duration
+   * @param duration        duration
+   * @param datasource      datasource
    *
    * @return list of {@link TaskInfo}
    */
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
index 4c3815cf9aa..c1cd2b3b7df 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -55,9 +55,9 @@ public TaskStorageQueryAdapter(TaskStorage storage)
     return storage.getActiveTasks();
   }
 
-  public List<TaskInfo<Task>> getActiveTaskInfo()
+  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
   {
-    return storage.getActiveTaskInfo();
+    return storage.getActiveTaskInfo(dataSource);
   }
 
   public List<TaskInfo<Task>> getRecentlyCompletedTaskInfo(
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 895ea8ff3d5..68b5ff69167 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -629,10 +629,10 @@ public Response getTasks(
       finalTaskList.addAll(completedTasks);
     }
 
-    List<TaskInfo<Task>> allActiveTaskInfo = Lists.newArrayList();
+    final List<TaskInfo<Task>> allActiveTaskInfo;
     final List<AnyTask> allActiveTasks = Lists.newArrayList();
     if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) {
-      allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo();
+      allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
       for (final TaskInfo<Task> task : allActiveTaskInfo) {
         allActiveTasks.add(
             new AnyTask(
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
index a8ad29c8d35..3a02c8bee2d 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -172,7 +172,7 @@ public void testIsLeader()
   public void testSecuredGetWaitingTask()
   {
     expectAuthorizationTokenCheck();
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -278,7 +278,7 @@ public void testSecuredGetRunningTasks()
             new MockTaskRunnerWorkItem(tasksIds.get(1), null)
         )
     );
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -337,7 +337,7 @@ public void testGetTasks()
         )
     );
     //active tasks
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -427,7 +427,7 @@ public void testGetTasksFilterDataSource()
         )
     );
     //active tasks
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -491,7 +491,7 @@ public void testGetTasksFilterWaitingState()
   {
     expectAuthorizationTokenCheck();
     //active tasks
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -549,7 +549,7 @@ public void testGetTasksFilterWaitingState()
   public void testGetTasksFilterRunningState()
   {
     expectAuthorizationTokenCheck();
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
@@ -615,7 +615,7 @@ public void testGetTasksFilterPendingState()
             new MockTaskRunnerWorkItem(tasksIds.get(1), null)
         )
     );
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo()).andStubReturn(
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
         ImmutableList.of(
             new TaskInfo(
                 "id_1",
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index a6453327cd6..ce05ec2c903 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -46,7 +46,7 @@
 
   @Override
   protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
-      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource
+      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
     String sql = StringUtils.format(
@@ -59,7 +59,7 @@
         + "FROM "
         + "  %s "
         + "WHERE "
-        + getWhereClauseForInactiveStatusesSinceQuery(datasource)
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
         + "ORDER BY created_date DESC",
         getEntryTable()
     );
@@ -72,8 +72,8 @@
     if (maxNumStatuses != null) {
       query = query.bind("n", maxNumStatuses);
     }
-    if (datasource != null) {
-      query = query.bind("ds", datasource);
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
     }
     return query;
   }
diff --git a/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
index 85cf91549fe..32d2fdb060f 100644
--- a/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
@@ -46,7 +46,7 @@
 
   @Override
   protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
-      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource
+      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
     String sql = StringUtils.format(
@@ -59,7 +59,7 @@
         + "FROM "
         + "  %s "
         + "WHERE "
-        + getWhereClauseForInactiveStatusesSinceQuery(datasource)
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
         + "ORDER BY created_date DESC",
         getEntryTable()
     );
@@ -72,8 +72,8 @@
     if (maxNumStatuses != null) {
       query = query.bind("n", maxNumStatuses);
     }
-    if (datasource != null) {
-      query = query.bind("ds", datasource);
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
     }
     return query;
   }
diff --git a/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
index ea31a65df32..08cb91c781d 100644
--- a/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
@@ -46,7 +46,7 @@ public PostgreSQLMetadataStorageActionHandler(
 
   @Override
   protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
-      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource
+      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
     String sql = StringUtils.format(
@@ -59,7 +59,7 @@ public PostgreSQLMetadataStorageActionHandler(
         + "FROM "
         + "  %s "
         + "WHERE "
-        + getWhereClauseForInactiveStatusesSinceQuery(datasource)
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
         + "ORDER BY created_date DESC",
         getEntryTable()
     );
@@ -73,8 +73,8 @@ public PostgreSQLMetadataStorageActionHandler(
     if (maxNumStatuses != null) {
       query = query.bind("n", maxNumStatuses);
     }
-    if (datasource != null) {
-      query = query.bind("ds", datasource);
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
     }
     return query;
   }
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 9231028123a..8374ecee07d 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -333,7 +333,7 @@ public Boolean withHandle(Handle handle) throws Exception
   public List<TaskInfo<EntryType>> getCompletedTaskInfo(
       DateTime timestamp,
       @Nullable Integer maxNumStatuses,
-      @Nullable String datasource
+      @Nullable String dataSource
   )
   {
     return getConnector().retryWithHandle(
@@ -342,7 +342,7 @@ public Boolean withHandle(Handle handle) throws Exception
               handle,
               timestamp,
               maxNumStatuses,
-              datasource
+              dataSource
           );
           return query.map(new TaskInfoMapper()).list();
         }
@@ -350,20 +350,52 @@ public Boolean withHandle(Handle handle) throws Exception
   }
 
   @Override
-  public List<TaskInfo<EntryType>> getActiveTaskInfo()
+  public List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return getConnector().retryWithHandle(
         handle -> {
-          return handle.createQuery(
-              StringUtils.format(
-                  "SELECT id, status_payload, payload, datasource, created_date FROM %s WHERE active = TRUE ORDER BY created_date",
-                  entryTable
-              )
-          ).map(new TaskInfoMapper()).list();
+          final Query<Map<String, Object>> query = createActiveStatusesQuery(
+              handle,
+              dataSource
+          );
+          return query.map(new TaskInfoMapper()).list();
         }
     );
   }
 
+  private Query<Map<String, Object>> createActiveStatusesQuery(Handle handle, @Nullable String dataSource)
+  {
+    String sql = StringUtils.format(
+        "SELECT "
+        + "  id, "
+        + "  status_payload, "
+        + "  payload, "
+        + "  datasource, "
+        + "  created_date "
+        + "FROM "
+        + "  %s "
+        + "WHERE "
+        + getWhereClauseForActiveStatusesQuery(dataSource)
+        + "ORDER BY created_date",
+        entryTable
+    );
+
+    Query<Map<String, Object>> query = handle.createQuery(sql);
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
+    }
+    return query;
+  }
+
+  private String getWhereClauseForActiveStatusesQuery(String dataSource)
+  {
+    String sql = StringUtils.format("active = TRUE ");
+    if (dataSource != null) {
+      sql += " AND datasource = :ds ";
+    }
+    return sql;
+  }
+
   class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType>>
   {
     @Override
@@ -401,7 +433,7 @@ public Boolean withHandle(Handle handle) throws Exception
       Handle handle,
       DateTime timestamp,
       @Nullable Integer maxNumStatuses,
-      @Nullable String datasource
+      @Nullable String dataSource
   );
 
   @Override
diff --git a/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
index 76e155c62e6..6fd354e261d 100644
--- a/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
@@ -46,7 +46,7 @@ public SQLServerMetadataStorageActionHandler(
 
   @Override
   protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
-      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String datasource
+      Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
     String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) ";
@@ -60,7 +60,7 @@ public SQLServerMetadataStorageActionHandler(
         + "FROM "
         + "  %s "
         + "WHERE "
-        + getWhereClauseForInactiveStatusesSinceQuery(datasource)
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
         + "ORDER BY created_date DESC",
         getEntryTable()
     );
@@ -70,8 +70,8 @@ public SQLServerMetadataStorageActionHandler(
     if (maxNumStatuses != null) {
       query = query.bind("n", maxNumStatuses);
     }
-    if (datasource != null) {
-      query = query.bind("ds", datasource);
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
     }
     return query;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org