You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/31 01:35:09 UTC

[incubator-druid] branch master updated: Remove some unnecessary task storage internal APIs. (#6058)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aa7017  Remove some unnecessary task storage internal APIs. (#6058)
3aa7017 is described below

commit 3aa70179751d049669698f4becd8bb6cbee442fe
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Mon Jul 30 18:35:06 2018 -0700

    Remove some unnecessary task storage internal APIs. (#6058)
    
    * Remove some unnecessary task storage internal APIs.
    
    - Remove MetadataStorageActionHandler's getInactiveStatusesSince and getActiveEntriesWithStatus.
    - Remove TaskStorage's getCreatedDateTimeAndDataSource.
    - Remove TaskStorageQueryAdapter's getCreatedTime, and getCreatedDateAndDataSource.
    - Migrated all callers to getActiveTaskInfo and getCompletedTaskInfo.
    
    This has one side effect: since getActiveTaskInfo (new) warns and continues when it
    sees unreadable tasks, but getActiveEntriesWithStatus threw an exception when it
    encountered those, it means that after this patch bad tasks will be ignored when
    syncing from metadata storage rather than causing an exception to be thrown.
    
    IMO, this is an improvement, since the most likely reason for bad tasks is either:
    
    - A new version introduced an additional validation, and a pre-existing task doesn't
      pass it.
    - You are rolling back from a newer version to an older version.
    
    In both cases, I believe you would want to skip tasks that can't be deserialized,
    rather than blocking overlord startup.
    
    * Remove unused import.
    
    * Fix formatting.
    
    * Fix formatting.
---
 api/src/main/java/io/druid/indexer/TaskInfo.java   |  17 +--
 .../metadata/MetadataStorageActionHandler.java     |  34 +-----
 .../sqlserver/CustomStatementRewriterTest.java     |  15 ---
 .../indexing/overlord/HeapMemoryTaskStorage.java   |  35 ++----
 .../overlord/IndexerMetadataStorageAdapter.java    |   9 +-
 .../indexing/overlord/MetadataTaskStorage.java     |  46 ++------
 .../io/druid/indexing/overlord/TaskStorage.java    |   9 +-
 .../indexing/overlord/TaskStorageQueryAdapter.java |  18 +--
 .../indexing/overlord/http/OverlordResource.java   |   8 +-
 .../IndexerMetadataStorageAdapterTest.java         |  52 +++++++--
 .../DerbyMetadataStorageActionHandler.java         |   2 +-
 .../MySQLMetadataStorageActionHandler.java         |   2 +-
 .../PostgreSQLMetadataStorageActionHandler.java    |   2 +-
 .../metadata/SQLMetadataStorageActionHandler.java  | 124 ++-------------------
 .../SQLServerMetadataStorageActionHandler.java     |   2 +-
 .../SQLMetadataStorageActionHandlerTest.java       |  39 +++++--
 16 files changed, 123 insertions(+), 291 deletions(-)

diff --git a/api/src/main/java/io/druid/indexer/TaskInfo.java b/api/src/main/java/io/druid/indexer/TaskInfo.java
index e20e9ee..32b27a2 100644
--- a/api/src/main/java/io/druid/indexer/TaskInfo.java
+++ b/api/src/main/java/io/druid/indexer/TaskInfo.java
@@ -18,6 +18,7 @@
  */
 package io.druid.indexer;
 
+import com.google.common.base.Preconditions;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -25,11 +26,11 @@ import javax.annotation.Nullable;
 /**
  * This class is used to store task info from runner query and cache in OverlordResource
  */
-public class TaskInfo<EntryType>
+public class TaskInfo<EntryType, StatusType>
 {
   private final String id;
   private final DateTime createdTime;
-  private final TaskStatus status;
+  private final StatusType status;
   private final String dataSource;
   @Nullable
   private final EntryType task;
@@ -37,15 +38,15 @@ public class TaskInfo<EntryType>
   public TaskInfo(
       String id,
       DateTime createdTime,
-      TaskStatus status,
+      StatusType status,
       String dataSource,
       @Nullable EntryType task
   )
   {
-    this.id = id;
-    this.createdTime = createdTime;
-    this.status = status;
-    this.dataSource = dataSource;
+    this.id = Preconditions.checkNotNull(id, "id");
+    this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
+    this.status = Preconditions.checkNotNull(status, "status");
+    this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.task = task;
   }
 
@@ -59,7 +60,7 @@ public class TaskInfo<EntryType>
     return createdTime;
   }
 
-  public TaskStatus getStatus()
+  public StatusType getStatus()
   {
     return status;
   }
diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
index dca2dd0..0fc4312 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
@@ -21,7 +21,6 @@ package io.druid.metadata;
 
 import com.google.common.base.Optional;
 import io.druid.indexer.TaskInfo;
-import io.druid.java.util.common.Pair;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -80,27 +79,6 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
   Optional<StatusType> getStatus(String entryId);
 
   /**
-   * Return all active entries with their respective status
-   *
-   * @return list of (entry, status) pairs
-   */
-  List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
-
-  default List<StatusType> getInactiveStatusesSince(DateTime timestamp)
-  {
-    return getInactiveStatusesSince(timestamp, null);
-  }
-
-  /**
-   * Return up to {@code maxNumStatuses} statuses for inactive entries created on or later than the given timestamp
-   *
-   * @param timestamp timestamp
-   * @param maxNumStatuses maxNumStatuses
-   * @return list of statuses
-   */
-  List<StatusType> getInactiveStatusesSince(DateTime timestamp, @Nullable Integer maxNumStatuses);
-
-  /**
    * Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries
    * created on or later than the given timestamp
    *
@@ -109,7 +87,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<EntryType>> getCompletedTaskInfo(
+  List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
       DateTime timestamp,
       @Nullable Integer maxNumStatuses,
       @Nullable String datasource
@@ -120,15 +98,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource);
-
-  /**
-   * Return createdDate and dataSource for the given id
-   *
-   * @return a pair of createdDate and dataSource or null if an entry for the given id is not found
-   */
-  @Nullable
-  Pair<DateTime, String> getCreatedDateAndDataSource(String entryId);
+  List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource);
 
   /**
    * Add a lock to the given entry
diff --git a/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/CustomStatementRewriterTest.java b/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/CustomStatementRewriterTest.java
index 6f922af..ea9394a 100644
--- a/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/CustomStatementRewriterTest.java
+++ b/extensions-contrib/sqlserver-metadata-storage/src/test/java/io/druid/metadata/storage/sqlserver/CustomStatementRewriterTest.java
@@ -139,19 +139,4 @@ public class CustomStatementRewriterTest
         rewrite("UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = TRUE"));
 
   }
-
-  /**
-   *
-   * @see io.druid.metadata.SQLMetadataStorageActionHandler#getInactiveStatusesSince(org.joda.time.DateTime)
-   *
-   */
-  @Test
-  public void testSQLMetadataStorageActionHandlerGetInactiveStatusesSince()
-  {
-    Assert.assertEquals(
-        "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= ? ORDER BY created_date DESC",
-        rewrite(
-            "SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC"));
-  }
-
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 341cf8a..db3a873 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -35,7 +35,6 @@ import io.druid.indexing.common.actions.TaskAction;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.Task;
 import io.druid.java.util.common.DateTimes;
-import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.logger.Logger;
 import io.druid.metadata.EntryExistsException;
 import org.joda.time.DateTime;
@@ -170,12 +169,12 @@ public class HeapMemoryTaskStorage implements TaskStorage
   }
 
   @Override
-  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
+  public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
   {
     giant.lock();
 
     try {
-      final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
+      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
       for (final TaskStuff taskStuff : tasks.values()) {
         if (taskStuff.getStatus().isRunnable()) {
           TaskInfo t = new TaskInfo(
@@ -196,7 +195,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
   }
 
   @Override
-  public List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String datasource
   )
   {
@@ -224,7 +223,10 @@ public class HeapMemoryTaskStorage implements TaskStorage
     }
   }
 
-  private List<TaskInfo<Task>> getRecentlyFinishedTaskInfoSince(DateTime start, Ordering<TaskStuff> createdDateDesc)
+  private List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfoSince(
+      DateTime start,
+      Ordering<TaskStuff> createdDateDesc
+  )
   {
     giant.lock();
 
@@ -234,7 +236,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
           .stream()
           .filter(taskStuff -> taskStuff.getStatus().isComplete())
           .collect(Collectors.toList());
-      final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
+      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
       for (final TaskStuff taskStuff : list) {
         String id = taskStuff.getTask().getId();
         TaskInfo t = new TaskInfo(
@@ -253,7 +255,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
     }
   }
 
-  private List<TaskInfo<Task>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
+  private List<TaskInfo<Task, TaskStatus>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
   {
     giant.lock();
 
@@ -263,10 +265,10 @@ public class HeapMemoryTaskStorage implements TaskStorage
           .stream()
           .limit(n)
           .collect(Collectors.toList());
-      final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
+      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
       for (final TaskStuff taskStuff : list) {
         String id = taskStuff.getTask().getId();
-        TaskInfo t = new TaskInfo(
+        TaskInfo t = new TaskInfo<>(
             id,
             taskStuff.getCreatedDate(),
             taskStuff.getStatus(),
@@ -282,21 +284,6 @@ public class HeapMemoryTaskStorage implements TaskStorage
     }
   }
 
-  @Nullable
-  @Override
-  public Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId)
-  {
-    giant.lock();
-
-    try {
-      final TaskStuff taskStuff = tasks.get(taskId);
-      return taskStuff == null ? null : Pair.of(taskStuff.getCreatedDate(), taskStuff.getDataSource());
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
   @Override
   public void addLock(final String taskid, final TaskLock taskLock)
   {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
index 3a097e9..a904340 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
@@ -21,6 +21,7 @@ package io.druid.indexing.overlord;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import io.druid.indexer.TaskInfo;
 import io.druid.java.util.common.DateTimes;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -47,13 +48,9 @@ public class IndexerMetadataStorageAdapter
   {
     // Check the given interval overlaps the interval(minCreatedDateOfActiveTasks, MAX)
     final Optional<DateTime> minCreatedDateOfActiveTasks = taskStorageQueryAdapter
-        .getActiveTasks()
+        .getActiveTaskInfo(dataSource)
         .stream()
-        .map(task -> Preconditions.checkNotNull(
-            taskStorageQueryAdapter.getCreatedTime(task.getId()),
-            "Can't find the createdTime for task[%s]",
-            task.getId()
-        ))
+        .map(TaskInfo::getCreatedTime)
         .min(Comparator.naturalOrder());
 
     final Interval activeTaskInterval = new Interval(
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
index b927497..28f6bbc 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -36,7 +35,6 @@ import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.Task;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.lifecycle.LifecycleStart;
 import io.druid.java.util.common.lifecycle.LifecycleStop;
 import io.druid.java.util.emitter.EmittingLogger;
@@ -46,12 +44,12 @@ import io.druid.metadata.MetadataStorageActionHandlerFactory;
 import io.druid.metadata.MetadataStorageActionHandlerTypes;
 import io.druid.metadata.MetadataStorageConnector;
 import io.druid.metadata.MetadataStorageTablesConfig;
-import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class MetadataTaskStorage implements TaskStorage
 {
@@ -185,36 +183,15 @@ public class MetadataTaskStorage implements TaskStorage
   @Override
   public List<Task> getActiveTasks()
   {
-    return ImmutableList.copyOf(
-        Iterables.transform(
-            Iterables.filter(
-                handler.getActiveEntriesWithStatus(),
-                new Predicate<Pair<Task, TaskStatus>>()
-                {
-                  @Override
-                  public boolean apply(
-                      @Nullable Pair<Task, TaskStatus> input
-                  )
-                  {
-                    return input.rhs.isRunnable();
-                  }
-                }
-            ),
-            new Function<Pair<Task, TaskStatus>, Task>()
-            {
-              @Nullable
-              @Override
-              public Task apply(@Nullable Pair<Task, TaskStatus> input)
-              {
-                return input.lhs;
-              }
-            }
-        )
-    );
+    return handler.getActiveTaskInfo(null)
+           .stream()
+           .filter(taskInfo -> taskInfo.getStatus().isRunnable())
+           .map(TaskInfo::getTask)
+           .collect(Collectors.toList());
   }
 
   @Override
-  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
+  public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return ImmutableList.copyOf(
         handler.getActiveTaskInfo(dataSource)
@@ -222,7 +199,7 @@ public class MetadataTaskStorage implements TaskStorage
   }
 
   @Override
-  public List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses,
       @Nullable Duration duration,
       @Nullable String datasource
@@ -237,13 +214,6 @@ public class MetadataTaskStorage implements TaskStorage
     );
   }
 
-  @Nullable
-  @Override
-  public Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId)
-  {
-    return handler.getCreatedDateAndDataSource(taskId);
-  }
-
   @Override
   public void addLock(final String taskid, final TaskLock taskLock)
   {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
index 141e08d..89849e5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
@@ -25,9 +25,7 @@ import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.actions.TaskAction;
 import io.druid.indexing.common.task.Task;
-import io.druid.java.util.common.Pair;
 import io.druid.metadata.EntryExistsException;
-import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
@@ -131,7 +129,7 @@ public interface TaskStorage
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource);
+  List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource);
 
   /**
    * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No
@@ -144,15 +142,12 @@ public interface TaskStorage
    *
    * @return list of {@link TaskInfo}
    */
-  List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
+  List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
       @Nullable Integer maxTaskStatuses,
       @Nullable Duration duration,
       @Nullable String datasource
   );
 
-  @Nullable
-  Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId);
-
   /**
    * Returns a list of locks for a particular task.
    *
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
index c1cd2b3..da7dd71 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -28,9 +28,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
 import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import io.druid.indexing.common.actions.TaskAction;
 import io.druid.indexing.common.task.Task;
-import io.druid.java.util.common.Pair;
 import io.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
@@ -55,12 +53,12 @@ public class TaskStorageQueryAdapter
     return storage.getActiveTasks();
   }
 
-  public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
+  public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return storage.getActiveTaskInfo(dataSource);
   }
 
-  public List<TaskInfo<Task>> getRecentlyCompletedTaskInfo(
+  public List<TaskInfo<Task, TaskStatus>> getRecentlyCompletedTaskInfo(
       @Nullable Integer maxTaskStatuses,
       @Nullable Duration duration,
       @Nullable String dataSource
@@ -69,13 +67,6 @@ public class TaskStorageQueryAdapter
     return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
   }
 
-  @Nullable
-  public DateTime getCreatedTime(String taskId)
-  {
-    final Pair<DateTime, String> pair = storage.getCreatedDateTimeAndDataSource(taskId);
-    return pair == null ? null : pair.lhs;
-  }
-
   public Optional<Task> getTask(final String taskid)
   {
     return storage.getTask(taskid);
@@ -108,9 +99,4 @@ public class TaskStorageQueryAdapter
     }
     return segments;
   }
-  
-  public Pair<DateTime, String> getCreatedDateAndDataSource(String taskId)
-  {
-    return storage.getCreatedDateTimeAndDataSource(taskId);
-  }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 95639db..6589e0c 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -606,7 +606,7 @@ public class OverlordResource
         null
     );
 
-    Function<TaskInfo<Task>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
+    Function<TaskInfo<Task, TaskStatus>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
         taskInfo.getId(),
         taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
         taskInfo.getCreatedTime(),
@@ -628,18 +628,18 @@ public class OverlordResource
         final Interval theInterval = Intervals.of(interval.replace("_", "/"));
         duration = theInterval.toDuration();
       }
-      final List<TaskInfo<Task>> taskInfoList = taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(
+      final List<TaskInfo<Task, TaskStatus>> taskInfoList = taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(
           maxCompletedTasks, duration, dataSource
       );
       final List<TaskStatusPlus> completedTasks = Lists.transform(taskInfoList, completeTaskTransformFunc);
       finalTaskList.addAll(completedTasks);
     }
 
-    final List<TaskInfo<Task>> allActiveTaskInfo;
+    final List<TaskInfo<Task, TaskStatus>> allActiveTaskInfo;
     final List<AnyTask> allActiveTasks = Lists.newArrayList();
     if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) {
       allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
-      for (final TaskInfo<Task> task : allActiveTaskInfo) {
+      for (final TaskInfo<Task, TaskStatus> task : allActiveTaskInfo) {
         allActiveTasks.add(
             new AnyTask(
                 task.getId(),
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
index 72644aa..0d42c60 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
@@ -20,7 +20,10 @@
 package io.druid.indexing.overlord;
 
 import com.google.common.collect.ImmutableList;
+import io.druid.indexer.TaskInfo;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.task.NoopTask;
+import io.druid.indexing.common.task.Task;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import org.easymock.EasyMock;
@@ -32,6 +35,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.List;
+
 public class IndexerMetadataStorageAdapterTest
 {
   @Rule
@@ -55,12 +60,23 @@ public class IndexerMetadataStorageAdapterTest
   @Test
   public void testDeletePendingSegments()
   {
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTasks())
-            .andReturn(ImmutableList.of(NoopTask.create("id1", 0), NoopTask.create("id2", 0)));
-    EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id1")))
-            .andReturn(DateTimes.of("2017-12-01"));
-    EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id2")))
-            .andReturn(DateTimes.of("2017-12-02"));
+    final List<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
+        new TaskInfo<>(
+            "id1",
+            DateTimes.of("2017-12-01"),
+            TaskStatus.running("id1"),
+            "dataSource",
+            NoopTask.create("id1", 0)
+        ),
+        new TaskInfo<>(
+            "id1",
+            DateTimes.of("2017-12-02"),
+            TaskStatus.running("id2"),
+            "dataSource",
+            NoopTask.create("id2", 0)
+        )
+    );
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
 
     final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
     EasyMock
@@ -76,12 +92,24 @@ public class IndexerMetadataStorageAdapterTest
   @Test
   public void testDeletePendingSegmentsOfRunningTasks()
   {
-    EasyMock.expect(taskStorageQueryAdapter.getActiveTasks())
-            .andReturn(ImmutableList.of(NoopTask.create("id1", 0), NoopTask.create("id2", 0)));
-    EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id1")))
-            .andReturn(DateTimes.of("2017-11-01"));
-    EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id2")))
-            .andReturn(DateTimes.of("2017-12-02"));
+    final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
+        new TaskInfo<>(
+            "id1",
+            DateTimes.of("2017-11-01"),
+            TaskStatus.running("id1"),
+            "dataSource",
+            NoopTask.create("id1", 0)
+        ),
+        new TaskInfo<>(
+            "id1",
+            DateTimes.of("2017-12-02"),
+            TaskStatus.running("id2"),
+            "dataSource",
+            NoopTask.create("id2", 0)
+        )
+    );
+
+    EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
 
     final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
     EasyMock
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index 6986cf7..42986c5 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -47,7 +47,7 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
   }
 
   @Override
-  protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
+  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
       Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
diff --git a/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
index 32d2fdb..69af28f 100644
--- a/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/MySQLMetadataStorageActionHandler.java
@@ -45,7 +45,7 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
   }
 
   @Override
-  protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
+  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
       Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
diff --git a/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
index 08cb91c..310d48d 100644
--- a/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/PostgreSQLMetadataStorageActionHandler.java
@@ -45,7 +45,7 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
   }
 
   @Override
-  protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
+  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
       Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 306816b..f940fe3 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.indexer.TaskInfo;
-import io.druid.indexer.TaskStatus;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.StringUtils;
@@ -237,88 +236,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   }
 
   @Override
-  public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus()
-  {
-    return connector.retryWithHandle(
-        new HandleCallback<List<Pair<EntryType, StatusType>>>()
-        {
-          @Override
-          public List<Pair<EntryType, StatusType>> withHandle(Handle handle)
-          {
-            return handle
-                .createQuery(
-                    StringUtils.format(
-                        "SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date",
-                        entryTable
-                    )
-                )
-                .map(
-                    new ResultSetMapper<Pair<EntryType, StatusType>>()
-                    {
-                      @Override
-                      public Pair<EntryType, StatusType> map(int index, ResultSet r, StatementContext ctx)
-                          throws SQLException
-                      {
-                        try {
-                          return Pair.of(
-                              jsonMapper.readValue(
-                                  r.getBytes("payload"),
-                                  entryType
-                              ),
-                              jsonMapper.readValue(
-                                  r.getBytes("status_payload"),
-                                  statusType
-                              )
-                          );
-                        }
-                        catch (IOException e) {
-                          log.makeAlert(e, "Failed to parse entry payload").addData("entry", r.getString("id")).emit();
-                          throw new SQLException(e);
-                        }
-                      }
-                    }
-                ).list();
-          }
-        }
-    );
-
-  }
-
-  @Override
-  public List<StatusType> getInactiveStatusesSince(DateTime timestamp, @Nullable Integer maxNumStatuses)
-  {
-    return getConnector().retryWithHandle(
-        handle -> {
-          final Query<Map<String, Object>> query = createInactiveStatusesSinceQuery(
-              handle,
-              timestamp,
-              maxNumStatuses,
-              null
-          );
-
-          return query
-              .map(
-                  (ResultSetMapper<StatusType>) (index, r, ctx) -> {
-                    try {
-                      return getJsonMapper().readValue(
-                          r.getBytes("status_payload"),
-                          getStatusType()
-                      );
-                    }
-                    catch (IOException e) {
-                      log.makeAlert(e, "Failed to parse status payload")
-                         .addData("entry", r.getString("id"))
-                         .emit();
-                      throw new SQLException(e);
-                    }
-                  }
-              ).list();
-        }
-    );
-  }
-
-  @Override
-  public List<TaskInfo<EntryType>> getCompletedTaskInfo(
+  public List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
       DateTime timestamp,
       @Nullable Integer maxNumStatuses,
       @Nullable String dataSource
@@ -326,7 +244,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   {
     return getConnector().retryWithHandle(
         handle -> {
-          final Query<Map<String, Object>> query = createInactiveStatusesSinceQuery(
+          final Query<Map<String, Object>> query = createCompletedTaskInfoQuery(
               handle,
               timestamp,
               maxNumStatuses,
@@ -338,11 +256,11 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   }
 
   @Override
-  public List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource)
+  public List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return getConnector().retryWithHandle(
         handle -> {
-          final Query<Map<String, Object>> query = createActiveStatusesQuery(
+          final Query<Map<String, Object>> query = createActiveTaskInfoQuery(
               handle,
               dataSource
           );
@@ -351,7 +269,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
     );
   }
 
-  private Query<Map<String, Object>> createActiveStatusesQuery(Handle handle, @Nullable String dataSource)
+  private Query<Map<String, Object>> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource)
   {
     String sql = StringUtils.format(
         "SELECT "
@@ -384,14 +302,14 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
     return sql;
   }
 
-  class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType>>
+  class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType, StatusType>>
   {
     @Override
-    public TaskInfo<EntryType> map(int index, ResultSet resultSet, StatementContext context) throws SQLException
+    public TaskInfo<EntryType, StatusType> map(int index, ResultSet resultSet, StatementContext context) throws SQLException
     {
-      final TaskInfo<EntryType> taskInfo;
+      final TaskInfo<EntryType, StatusType> taskInfo;
       EntryType task;
-      TaskStatus status;
+      StatusType status;
       try {
         task = getJsonMapper().readValue(resultSet.getBytes("payload"), getEntryType());
       }
@@ -417,7 +335,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
     }
   }
 
-  protected abstract Query<Map<String, Object>> createInactiveStatusesSinceQuery(
+  protected abstract Query<Map<String, Object>> createCompletedTaskInfoQuery(
       Handle handle,
       DateTime timestamp,
       @Nullable Integer maxNumStatuses,
@@ -425,28 +343,6 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   );
 
   @Override
-  @Nullable
-  public Pair<DateTime, String> getCreatedDateAndDataSource(String entryId)
-  {
-    return connector.retryWithHandle(
-        handle -> handle
-        .createQuery(
-            StringUtils.format(
-                "SELECT created_date, datasource FROM %s WHERE id = :entryId",
-                entryTable
-            )
-        )
-        .bind("entryId", entryId)
-        .map(
-            (index, resultSet, ctx) -> Pair.of(
-                DateTimes.of(resultSet.getString("created_date")), resultSet.getString("datasource")
-            )
-        )
-        .first()
-    );
-  }
-
-  @Override
   public boolean addLock(final String entryId, final LockType lock)
   {
     return connector.retryWithHandle(
diff --git a/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
index 6fd354e..fe6f922 100644
--- a/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLServerMetadataStorageActionHandler.java
@@ -45,7 +45,7 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
   }
 
   @Override
-  protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
+  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
       Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
   )
   {
diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java
index e6c4b8e..1487fa5 100644
--- a/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java
+++ b/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java
@@ -25,6 +25,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import io.druid.indexer.TaskInfo;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Pair;
@@ -39,6 +40,7 @@ import org.junit.rules.ExpectedException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class SQLMetadataStorageActionHandlerTest
 {
@@ -133,19 +135,23 @@ public class SQLMetadataStorageActionHandlerTest
 
     Assert.assertEquals(
         ImmutableList.of(Pair.of(entry, status1)),
-        handler.getActiveEntriesWithStatus()
+        handler.getActiveTaskInfo(null).stream()
+               .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
+               .collect(Collectors.toList())
     );
 
     Assert.assertTrue(handler.setStatus(entryId, true, status2));
 
     Assert.assertEquals(
         ImmutableList.of(Pair.of(entry, status2)),
-        handler.getActiveEntriesWithStatus()
+        handler.getActiveTaskInfo(null).stream()
+               .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
+               .collect(Collectors.toList())
     );
 
     Assert.assertEquals(
         ImmutableList.of(),
-        handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"))
+        handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null)
     );
 
     Assert.assertTrue(handler.setStatus(entryId, false, status1));
@@ -170,12 +176,15 @@ public class SQLMetadataStorageActionHandlerTest
 
     Assert.assertEquals(
         ImmutableList.of(),
-        handler.getInactiveStatusesSince(DateTimes.of("2014-01-03"))
+        handler.getCompletedTaskInfo(DateTimes.of("2014-01-03"), null, null)
     );
 
     Assert.assertEquals(
         ImmutableList.of(status1),
-        handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"))
+        handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null)
+               .stream()
+               .map(TaskInfo::getStatus)
+               .collect(Collectors.toList())
     );
   }
 
@@ -190,11 +199,15 @@ public class SQLMetadataStorageActionHandlerTest
       handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
     }
 
-    final List<Map<String, Integer>> statuses = handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"), 7);
+    final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo(
+        DateTimes.of("2014-01-01"),
+        7,
+        null
+    );
     Assert.assertEquals(7, statuses.size());
     int i = 10;
-    for (Map<String, Integer> status : statuses) {
-      Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status);
+    for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
+      Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
     }
   }
 
@@ -209,11 +222,15 @@ public class SQLMetadataStorageActionHandlerTest
       handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
     }
 
-    final List<Map<String, Integer>> statuses = handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"), 10);
+    final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo(
+        DateTimes.of("2014-01-01"),
+        10,
+        null
+    );
     Assert.assertEquals(5, statuses.size());
     int i = 5;
-    for (Map<String, Integer> status : statuses) {
-      Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status);
+    for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
+      Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org