You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:08 UTC

[incubator-druid] 07/20: Fix two issues with Coordinator -> Overlord communication. (#7412)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 01aa1666dd7615edf458ffeee6d365072349c292
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Thu Apr 4 13:25:18 2019 -0400

    Fix two issues with Coordinator -> Overlord communication. (#7412)
    
    * Fix two issues with Coordinator -> Overlord communication.
    
    1) ClientCompactQuery needs to recognize the potential for 'intervals'
    to be set instead of 'segments'. The lack of this led to a
    NullPointerException on DruidCoordinatorSegmentCompactor.java:102.
    
    2) In two locations (DruidCoordinatorSegmentCompactor,
    DruidCoordinatorCleanupPendingSegments) tasks were being retrieved
    using waiting/pending/running tasks in the wrong order: by checking
    'running' first and then 'pending', tasks could be missed if they
    moved from 'pending' to 'running' in between the two calls. Replaced
    these methods with calls to 'getActiveTasks', a new method that does
    the calls in the right order.
    
    * Remove unused import.
---
 .../druid/client/indexing/ClientCompactQuery.java  | 47 +++++++++++++++++++++-
 .../client/indexing/HttpIndexingServiceClient.java | 43 +++++++++++++-------
 .../client/indexing/IndexingServiceClient.java     |  9 ++---
 .../DruidCoordinatorCleanupPendingSegments.java    | 20 +--------
 .../helper/DruidCoordinatorSegmentCompactor.java   | 38 ++++++++---------
 .../client/indexing/NoopIndexingServiceClient.java | 17 ++------
 .../DruidCoordinatorSegmentCompactorTest.java      | 14 +------
 7 files changed, 103 insertions(+), 85 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index f32b11d..b4bbd1a 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -22,15 +22,18 @@ package org.apache.druid.client.indexing;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class ClientCompactQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
+  private final Interval interval;
   private final boolean keepSegmentGranularity;
   @Nullable
   private final Long targetCompactionSizeBytes;
@@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery
   @JsonCreator
   public ClientCompactQuery(
       @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("segments") List<DataSegment> segments,
+      @Nullable @JsonProperty("interval") final Interval interval,
+      @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
       @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
       @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@@ -49,6 +53,7 @@ public class ClientCompactQuery implements ClientQuery
   {
     this.dataSource = dataSource;
     this.segments = segments;
+    this.interval = interval;
     this.keepSegmentGranularity = keepSegmentGranularity;
     this.targetCompactionSizeBytes = targetCompactionSizeBytes;
     this.tuningConfig = tuningConfig;
@@ -76,6 +81,12 @@ public class ClientCompactQuery implements ClientQuery
   }
 
   @JsonProperty
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  @JsonProperty
   public boolean isKeepSegmentGranularity()
   {
     return keepSegmentGranularity;
@@ -101,11 +112,45 @@ public class ClientCompactQuery implements ClientQuery
   }
 
   @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactQuery that = (ClientCompactQuery) o;
+    return keepSegmentGranularity == that.keepSegmentGranularity &&
+           Objects.equals(dataSource, that.dataSource) &&
+           Objects.equals(segments, that.segments) &&
+           Objects.equals(interval, that.interval) &&
+           Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
+           Objects.equals(tuningConfig, that.tuningConfig) &&
+           Objects.equals(context, that.context);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSource,
+        segments,
+        interval,
+        keepSegmentGranularity,
+        targetCompactionSizeBytes,
+        tuningConfig,
+        context
+    );
+  }
+
+  @Override
   public String toString()
   {
     return "ClientCompactQuery{" +
            "dataSource='" + dataSource + '\'' +
            ", segments=" + segments +
+           ", interval=" + interval +
            ", keepSegmentGranularity=" + keepSegmentGranularity +
            ", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
            ", tuningConfig=" + tuningConfig +
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 2d8c4dc..8cda53c 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskStatusPlus;
@@ -42,11 +42,14 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class HttpIndexingServiceClient implements IndexingServiceClient
 {
@@ -112,6 +115,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
     return runTask(
         new ClientCompactQuery(
             dataSource,
+            null,
             segments,
             keepSegmentGranularity,
             targetCompactionSizeBytes,
@@ -150,7 +154,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       return Preconditions.checkNotNull(taskId, "Null task id for task[%s]", taskObject);
     }
     catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -184,7 +188,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       return killedTaskId;
     }
     catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -217,21 +221,30 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
-  public List<TaskStatusPlus> getRunningTasks()
+  public List<TaskStatusPlus> getActiveTasks()
   {
-    return getTasks("runningTasks");
-  }
+    // Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
+    // calls then we still catch them. (Tasks always go waiting -> pending -> running.)
+    //
+    // Consider switching to new-style /druid/indexer/v1/tasks API in the future.
+    final List<TaskStatusPlus> tasks = new ArrayList<>();
+    final Set<String> taskIdsSeen = new HashSet<>();
+
+    final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
+        getTasks("waitingTasks"),
+        getTasks("pendingTasks"),
+        getTasks("runningTasks")
+    );
 
-  @Override
-  public List<TaskStatusPlus> getPendingTasks()
-  {
-    return getTasks("pendingTasks");
-  }
+    for (TaskStatusPlus task : activeTasks) {
+      // Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
+      // for example, and we see it twice.)
+      if (taskIdsSeen.add(task.getId())) {
+        tasks.add(task);
+      }
+    }
 
-  @Override
-  public List<TaskStatusPlus> getWaitingTasks()
-  {
-    return getTasks("waitingTasks");
+    return tasks;
   }
 
   private List<TaskStatusPlus> getTasks(String endpointSuffix)
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 905b810..4a2c0af 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -51,11 +51,10 @@ public interface IndexingServiceClient
 
   String killTask(String taskId);
 
-  List<TaskStatusPlus> getRunningTasks();
-
-  List<TaskStatusPlus> getPendingTasks();
-
-  List<TaskStatusPlus> getWaitingTasks();
+  /**
+   * Gets all tasks that are waiting, pending, or running.
+   */
+  List<TaskStatusPlus> getActiveTasks();
 
   TaskStatusResponse getTaskStatus(String taskId);
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
index a25ea07..bf4ffde 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
@@ -52,27 +52,11 @@ public class DruidCoordinatorCleanupPendingSegments implements DruidCoordinatorH
     final List<DateTime> createdTimes = new ArrayList<>();
     createdTimes.add(
         indexingServiceClient
-            .getRunningTasks()
+            .getActiveTasks()
             .stream()
             .map(TaskStatusPlus::getCreatedTime)
             .min(Comparators.naturalNullsFirst())
-            .orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time.
-    );
-    createdTimes.add(
-        indexingServiceClient
-            .getPendingTasks()
-            .stream()
-            .map(TaskStatusPlus::getCreatedTime)
-            .min(Comparators.naturalNullsFirst())
-            .orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time.
-    );
-    createdTimes.add(
-        indexingServiceClient
-            .getWaitingTasks()
-            .stream()
-            .map(TaskStatusPlus::getCreatedTime)
-            .min(Comparators.naturalNullsFirst())
-            .orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time.
+            .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
     );
 
     final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 0258cd0..d22f92a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -41,7 +41,6 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,11 +83,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
         Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
             .stream()
             .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
-        final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
-            indexingServiceClient.getRunningTasks(),
-            indexingServiceClient.getPendingTasks(),
-            indexingServiceClient.getWaitingTasks()
-        );
+        final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
         // dataSource -> list of intervals of compact tasks
         final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
         for (TaskStatusPlus status : compactTasks) {
@@ -98,13 +93,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
           }
           if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
             final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
-            final Interval interval = JodaUtils.umbrellaInterval(
-                compactQuery.getSegments()
-                            .stream()
-                            .map(DataSegment::getInterval)
-                            .sorted(Comparators.intervalsByStartThenEnd())
-                            .collect(Collectors.toList())
-            );
+            final Interval interval;
+
+            if (compactQuery.getSegments() != null) {
+              interval = JodaUtils.umbrellaInterval(
+                  compactQuery.getSegments()
+                              .stream()
+                              .map(DataSegment::getInterval)
+                              .sorted(Comparators.intervalsByStartThenEnd())
+                              .collect(Collectors.toList())
+              );
+            } else if (compactQuery.getInterval() != null) {
+              interval = compactQuery.getInterval();
+            } else {
+              throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
+            }
+
             compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
           } else {
             throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
@@ -146,13 +150,9 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
                  .build();
   }
 
-  @SafeVarargs
-  private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
+  private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
   {
-    final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
-    Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
-
-    return allTaskStatusPlus
+    return taskStatuses
         .stream()
         .filter(status -> {
           final String taskType = status.getType();
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 91e1a8a..8ef5667 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -80,21 +81,9 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
-  public List<TaskStatusPlus> getRunningTasks()
+  public List<TaskStatusPlus> getActiveTasks()
   {
-    return null;
-  }
-
-  @Override
-  public List<TaskStatusPlus> getPendingTasks()
-  {
-    return null;
-  }
-
-  @Override
-  public List<TaskStatusPlus> getWaitingTasks()
-  {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 52b78e2..66b73a7 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -107,19 +107,7 @@ public class DruidCoordinatorSegmentCompactorTest
     }
 
     @Override
-    public List<TaskStatusPlus> getRunningTasks()
-    {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public List<TaskStatusPlus> getPendingTasks()
-    {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public List<TaskStatusPlus> getWaitingTasks()
+    public List<TaskStatusPlus> getActiveTasks()
     {
       return Collections.emptyList();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org