You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:08 UTC
[incubator-druid] 07/20: Fix two issues with Coordinator ->
Overlord communication. (#7412)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 01aa1666dd7615edf458ffeee6d365072349c292
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Thu Apr 4 13:25:18 2019 -0400
Fix two issues with Coordinator -> Overlord communication. (#7412)
* Fix two issues with Coordinator -> Overlord communication.
1) ClientCompactQuery needs to recognize the potential for 'intervals'
to be set instead of 'segments'. The lack of this led to a
NullPointerException on DruidCoordinatorSegmentCompactor.java:102.
2) In two locations (DruidCoordinatorSegmentCompactor,
DruidCoordinatorCleanupPendingSegments) tasks were being retrieved
using waiting/pending/running tasks in the wrong order: by checking
'running' first and then 'pending', tasks could be missed if they
moved from 'pending' to 'running' in between the two calls. Replaced
these methods with calls to 'getActiveTasks', a new method that does
the calls in the right order.
* Remove unused import.
---
.../druid/client/indexing/ClientCompactQuery.java | 47 +++++++++++++++++++++-
.../client/indexing/HttpIndexingServiceClient.java | 43 +++++++++++++-------
.../client/indexing/IndexingServiceClient.java | 9 ++---
.../DruidCoordinatorCleanupPendingSegments.java | 20 +--------
.../helper/DruidCoordinatorSegmentCompactor.java | 38 ++++++++---------
.../client/indexing/NoopIndexingServiceClient.java | 17 ++------
.../DruidCoordinatorSegmentCompactorTest.java | 14 +------
7 files changed, 103 insertions(+), 85 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index f32b11d..b4bbd1a 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -22,15 +22,18 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final List<DataSegment> segments;
+ private final Interval interval;
private final boolean keepSegmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
@@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
- @JsonProperty("segments") List<DataSegment> segments,
+ @Nullable @JsonProperty("interval") final Interval interval,
+ @Nullable @JsonProperty("segments") final List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@@ -49,6 +53,7 @@ public class ClientCompactQuery implements ClientQuery
{
this.dataSource = dataSource;
this.segments = segments;
+ this.interval = interval;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
@@ -76,6 +81,12 @@ public class ClientCompactQuery implements ClientQuery
}
@JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
public boolean isKeepSegmentGranularity()
{
return keepSegmentGranularity;
@@ -101,11 +112,45 @@ public class ClientCompactQuery implements ClientQuery
}
@Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientCompactQuery that = (ClientCompactQuery) o;
+ return keepSegmentGranularity == that.keepSegmentGranularity &&
+ Objects.equals(dataSource, that.dataSource) &&
+ Objects.equals(segments, that.segments) &&
+ Objects.equals(interval, that.interval) &&
+ Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
+ Objects.equals(tuningConfig, that.tuningConfig) &&
+ Objects.equals(context, that.context);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ dataSource,
+ segments,
+ interval,
+ keepSegmentGranularity,
+ targetCompactionSizeBytes,
+ tuningConfig,
+ context
+ );
+ }
+
+ @Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
+ ", interval=" + interval +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 2d8c4dc..8cda53c 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
@@ -42,11 +42,14 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class HttpIndexingServiceClient implements IndexingServiceClient
{
@@ -112,6 +115,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
return runTask(
new ClientCompactQuery(
dataSource,
+ null,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
@@ -150,7 +154,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
return Preconditions.checkNotNull(taskId, "Null task id for task[%s]", taskObject);
}
catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -184,7 +188,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
return killedTaskId;
}
catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -217,21 +221,30 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
@Override
- public List<TaskStatusPlus> getRunningTasks()
+ public List<TaskStatusPlus> getActiveTasks()
{
- return getTasks("runningTasks");
- }
+ // Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
+ // calls then we still catch them. (Tasks always go waiting -> pending -> running.)
+ //
+ // Consider switching to new-style /druid/indexer/v1/tasks API in the future.
+ final List<TaskStatusPlus> tasks = new ArrayList<>();
+ final Set<String> taskIdsSeen = new HashSet<>();
+
+ final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
+ getTasks("waitingTasks"),
+ getTasks("pendingTasks"),
+ getTasks("runningTasks")
+ );
- @Override
- public List<TaskStatusPlus> getPendingTasks()
- {
- return getTasks("pendingTasks");
- }
+ for (TaskStatusPlus task : activeTasks) {
+ // Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
+ // for example, and we see it twice.)
+ if (taskIdsSeen.add(task.getId())) {
+ tasks.add(task);
+ }
+ }
- @Override
- public List<TaskStatusPlus> getWaitingTasks()
- {
- return getTasks("waitingTasks");
+ return tasks;
}
private List<TaskStatusPlus> getTasks(String endpointSuffix)
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 905b810..4a2c0af 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -51,11 +51,10 @@ public interface IndexingServiceClient
String killTask(String taskId);
- List<TaskStatusPlus> getRunningTasks();
-
- List<TaskStatusPlus> getPendingTasks();
-
- List<TaskStatusPlus> getWaitingTasks();
+ /**
+ * Gets all tasks that are waiting, pending, or running.
+ */
+ List<TaskStatusPlus> getActiveTasks();
TaskStatusResponse getTaskStatus(String taskId);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
index a25ea07..bf4ffde 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java
@@ -52,27 +52,11 @@ public class DruidCoordinatorCleanupPendingSegments implements DruidCoordinatorH
final List<DateTime> createdTimes = new ArrayList<>();
createdTimes.add(
indexingServiceClient
- .getRunningTasks()
+ .getActiveTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
- .orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time.
- );
- createdTimes.add(
- indexingServiceClient
- .getPendingTasks()
- .stream()
- .map(TaskStatusPlus::getCreatedTime)
- .min(Comparators.naturalNullsFirst())
- .orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time.
- );
- createdTimes.add(
- indexingServiceClient
- .getWaitingTasks()
- .stream()
- .map(TaskStatusPlus::getCreatedTime)
- .min(Comparators.naturalNullsFirst())
- .orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time.
+ .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
);
final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 0258cd0..d22f92a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -41,7 +41,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,11 +83,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
- final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
- indexingServiceClient.getRunningTasks(),
- indexingServiceClient.getPendingTasks(),
- indexingServiceClient.getWaitingTasks()
- );
+ final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
for (TaskStatusPlus status : compactTasks) {
@@ -98,13 +93,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
- final Interval interval = JodaUtils.umbrellaInterval(
- compactQuery.getSegments()
- .stream()
- .map(DataSegment::getInterval)
- .sorted(Comparators.intervalsByStartThenEnd())
- .collect(Collectors.toList())
- );
+ final Interval interval;
+
+ if (compactQuery.getSegments() != null) {
+ interval = JodaUtils.umbrellaInterval(
+ compactQuery.getSegments()
+ .stream()
+ .map(DataSegment::getInterval)
+ .sorted(Comparators.intervalsByStartThenEnd())
+ .collect(Collectors.toList())
+ );
+ } else if (compactQuery.getInterval() != null) {
+ interval = compactQuery.getInterval();
+ } else {
+ throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
+ }
+
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
@@ -146,13 +150,9 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
.build();
}
- @SafeVarargs
- private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
+ private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
{
- final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
- Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
-
- return allTaskStatusPlus
+ return taskStatuses
.stream()
.filter(status -> {
final String taskType = status.getType();
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 91e1a8a..8ef5667 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -80,21 +81,9 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
}
@Override
- public List<TaskStatusPlus> getRunningTasks()
+ public List<TaskStatusPlus> getActiveTasks()
{
- return null;
- }
-
- @Override
- public List<TaskStatusPlus> getPendingTasks()
- {
- return null;
- }
-
- @Override
- public List<TaskStatusPlus> getWaitingTasks()
- {
- return null;
+ return Collections.emptyList();
}
@Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 52b78e2..66b73a7 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -107,19 +107,7 @@ public class DruidCoordinatorSegmentCompactorTest
}
@Override
- public List<TaskStatusPlus> getRunningTasks()
- {
- return Collections.emptyList();
- }
-
- @Override
- public List<TaskStatusPlus> getPendingTasks()
- {
- return Collections.emptyList();
- }
-
- @Override
- public List<TaskStatusPlus> getWaitingTasks()
+ public List<TaskStatusPlus> getActiveTasks()
{
return Collections.emptyList();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org