You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/07/30 16:52:03 UTC
[incubator-druid] branch master updated: Enable Spotbugs:
WMI_WRONG_MAP_ITERATOR (#8005)
This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e016995 Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)
e016995 is described below
commit e016995d1fb6085a48e2e0d0a28504ed955f0293
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Tue Jul 30 18:51:53 2019 +0200
Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)
* WMI_WRONG_MAP_ITERATOR
* Fixed missing loop
---
codestyle/spotbugs-exclude.xml | 1 -
.../apache/druid/java/util/metrics/KeyedDiff.java | 10 +-
.../MaterializedViewSupervisor.java | 14 +-
.../DataSourceOptimizerMonitor.java | 8 +-
.../druid/indexing/overlord/RemoteTaskRunner.java | 503 ++++++++++-----------
.../overlord/hrtr/HttpRemoteTaskRunner.java | 78 ++--
.../overlord/supervisor/SupervisorManager.java | 4 +-
.../query/lookup/LookupListeningResource.java | 6 +-
8 files changed, 287 insertions(+), 337 deletions(-)
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index eb112b2..ca9fb08 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -88,5 +88,4 @@
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
<Bug pattern="URF_UNREAD_FIELD"/>
- <Bug pattern="WMI_WRONG_MAP_ITERATOR"/>
</FindBugsFilter>
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java b/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
index e4bfd04..c1a3bae 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
@@ -41,12 +41,12 @@ public class KeyedDiff
}
}
- public static Map<String, Long> subtract(Map<String, Long> xs, Map<String, Long> ys)
+ public static Map<String, Long> subtract(Map<String, Long> lhs, Map<String, Long> rhs)
{
- assert xs.keySet().equals(ys.keySet());
- final Map<String, Long> zs = new HashMap<String, Long>();
- for (String k : xs.keySet()) {
- zs.put(k, xs.get(k) - ys.get(k));
+ assert lhs.keySet().equals(rhs.keySet());
+ final Map<String, Long> zs = new HashMap<>();
+ for (Map.Entry<String, Long> k : lhs.entrySet()) {
+ zs.put(k.getKey(), k.getValue() - rhs.get(k.getKey()));
}
return zs;
}
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 1ed80ca..378ed50 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -371,16 +371,12 @@ public class MaterializedViewSupervisor implements Supervisor
Map<Interval, String> toDropInterval = new HashMap<>(difference.entriesOnlyOnRight());
// if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
// if some intervals are in running tasks, but the versions are different, stop the task.
- for (Interval interval : runningVersion.keySet()) {
- if (toBuildInterval.containsKey(interval)
- && toBuildInterval.get(interval).equals(runningVersion.get(interval))
- ) {
+ for (Map.Entry<Interval, String> version : runningVersion.entrySet()) {
+ final Interval interval = version.getKey();
+ final String host = version.getValue();
+ if (toBuildInterval.containsKey(interval) && toBuildInterval.get(interval).equals(host)) {
toBuildInterval.remove(interval);
-
- } else if (
- toBuildInterval.containsKey(interval)
- && !toBuildInterval.get(interval).equals(runningVersion.get(interval))
- ) {
+ } else if (toBuildInterval.containsKey(interval) && !toBuildInterval.get(interval).equals(host)) {
if (taskMaster.getTaskQueue().isPresent()) {
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch");
runningTasks.remove(interval);
diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
index 0720957..08b0b25 100644
--- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
+++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
@@ -41,7 +41,7 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
- List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
+ final List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
for (DataSourceOptimizerStats stat : stats) {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension("dataSource", stat.getBase());
@@ -50,9 +50,9 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
emitter.emit(builder.build("/materialized/view/query/hitRate", stat.getHitRate()));
emitter.emit(builder.build("/materialized/view/select/avgCostMS", stat.getOptimizerCost()));
Map<String, Long> derivativesStats = stat.getDerivativesHitCount();
- for (String derivative : derivativesStats.keySet()) {
- builder.setDimension("derivative", derivative);
- emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivativesStats.get(derivative)));
+ for (Map.Entry<String, Long> derivative : derivativesStats.entrySet()) {
+ builder.setDimension("derivative", derivative.getKey());
+ emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivative.getValue()));
}
final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder();
builder2.setDimension("dataSource", stat.getBase());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 65ffba7..aa9a582 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -45,8 +45,6 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
@@ -226,97 +224,92 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
- {
- final Worker worker;
- switch (event.getType()) {
- case CHILD_ADDED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- synchronized (waitingForMonitor) {
- waitingFor.increment();
- }
- Futures.addCallback(
- addWorker(worker),
- new FutureCallback<ZkWorker>()
+ (client, event) -> {
+ final Worker worker;
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ synchronized (waitingForMonitor) {
+ waitingFor.increment();
+ }
+ Futures.addCallback(
+ addWorker(worker),
+ new FutureCallback<ZkWorker>()
+ {
+ @Override
+ public void onSuccess(ZkWorker zkWorker)
{
- @Override
- public void onSuccess(ZkWorker zkWorker)
- {
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
+ synchronized (waitingForMonitor) {
+ waitingFor.decrement();
+ waitingForMonitor.notifyAll();
}
+ }
- @Override
- public void onFailure(Throwable throwable)
- {
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
+ @Override
+ public void onFailure(Throwable throwable)
+ {
+ synchronized (waitingForMonitor) {
+ waitingFor.decrement();
+ waitingForMonitor.notifyAll();
}
}
- );
- break;
- case CHILD_UPDATED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- updateWorker(worker);
- break;
-
- case CHILD_REMOVED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- removeWorker(worker);
- break;
- case INITIALIZED:
- // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
- List<String> workers;
- try {
- workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
- }
- catch (KeeperException.NoNodeException e) {
- // statusPath doesn't exist yet; can occur if no middleManagers have started.
- workers = ImmutableList.of();
- }
- for (String workerId : workers) {
- final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
- final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
- if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
- try {
- scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
- }
- catch (Exception e) {
- log.warn(
- e,
- "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
- workerId,
- workerStatusPath
- );
- }
+ }
+ );
+ break;
+ case CHILD_UPDATED:
+ worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ updateWorker(worker);
+ break;
+
+ case CHILD_REMOVED:
+ worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ removeWorker(worker);
+ break;
+ case INITIALIZED:
+ // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
+ List<String> workers;
+ try {
+ workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
+ }
+ catch (KeeperException.NoNodeException e) {
+ // statusPath doesn't exist yet; can occur if no middleManagers have started.
+ workers = ImmutableList.of();
+ }
+ for (String workerId : workers) {
+ final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
+ final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
+ if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
+ try {
+ scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
+ }
+ catch (Exception e) {
+ log.warn(
+ e,
+ "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
+ workerId,
+ workerStatusPath
+ );
}
}
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
- }
+ }
+ synchronized (waitingForMonitor) {
+ waitingFor.decrement();
+ waitingForMonitor.notifyAll();
+ }
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_RECONNECTED:
+ case CONNECTION_LOST:
+ // do nothing
}
}
);
@@ -331,7 +324,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
cleanupExec,
Period.ZERO.toStandardDuration(),
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
- () -> checkBlackListedNodes()
+ this::checkBlackListedNodes
);
provisioningService = provisioningStrategy.makeProvisioningService(this);
@@ -657,50 +650,45 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private void runPendingTasks()
{
runPendingTasksExec.submit(
- new Callable<Void>()
- {
- @Override
- public Void call()
- {
- try {
- // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
- // into running status
- List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
- sortByInsertionTime(copy);
-
- for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
- String taskId = taskRunnerWorkItem.getTaskId();
- if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
- try {
- //this can still be null due to race from explicit task shutdown request
- //or if another thread steals and completes this task right after this thread makes copy
- //of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
- Task task = pendingTaskPayloads.get(taskId);
- if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
- pendingTaskPayloads.remove(taskId);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Exception while trying to assign task")
- .addData("taskId", taskRunnerWorkItem.getTaskId())
- .emit();
- RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
- if (workItem != null) {
- taskComplete(workItem, null, TaskStatus.failure(taskId));
- }
+ (Callable<Void>) () -> {
+ try {
+ // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
+ // into running status
+ List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
+ sortByInsertionTime(copy);
+
+ for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
+ String taskId = taskRunnerWorkItem.getTaskId();
+ if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
+ try {
+ //this can still be null due to race from explicit task shutdown request
+ //or if another thread steals and completes this task right after this thread makes copy
+ //of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
+ Task task = pendingTaskPayloads.get(taskId);
+ if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
+ pendingTaskPayloads.remove(taskId);
}
- finally {
- tryAssignTasks.remove(taskId);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Exception while trying to assign task")
+ .addData("taskId", taskRunnerWorkItem.getTaskId())
+ .emit();
+ RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
+ if (workItem != null) {
+ taskComplete(workItem, null, TaskStatus.failure(taskId));
}
}
+ finally {
+ tryAssignTasks.remove(taskId);
+ }
}
}
- catch (Exception e) {
- log.makeAlert(e, "Exception in running pending tasks").emit();
- }
-
- return null;
}
+ catch (Exception e) {
+ log.makeAlert(e, "Exception in running pending tasks").emit();
+ }
+
+ return null;
}
);
}
@@ -782,16 +770,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Maps.transformEntries(
Maps.filterEntries(
zkWorkers,
- new Predicate<Map.Entry<String, ZkWorker>>()
- {
- @Override
- public boolean apply(Map.Entry<String, ZkWorker> input)
- {
- return !lazyWorkers.containsKey(input.getKey()) &&
- !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
- !blackListedWorkers.contains(input.getValue());
- }
- }
+ input -> !lazyWorkers.containsKey(input.getKey()) &&
+ !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.contains(input.getValue())
),
(String key, ZkWorker value) -> value.toImmutable()
)
@@ -935,111 +916,106 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Add status listener to the watcher for status changes
zkWorker.addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- {
- final String taskId;
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
- synchronized (statusLock) {
- try {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_UPDATED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- final TaskAnnouncement announcement = jsonMapper.readValue(
- event.getData().getData(), TaskAnnouncement.class
+ (client, event) -> {
+ final String taskId;
+ final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+ synchronized (statusLock) {
+ try {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ final TaskAnnouncement announcement = jsonMapper.readValue(
+ event.getData().getData(), TaskAnnouncement.class
+ );
+
+ log.info(
+ "Worker[%s] wrote %s status for task [%s] on [%s]",
+ zkWorker.getWorker().getHost(),
+ announcement.getTaskStatus().getStatusCode(),
+ taskId,
+ announcement.getTaskLocation()
+ );
+
+ // Synchronizing state with ZK
+ statusLock.notifyAll();
+
+ final RemoteTaskRunnerWorkItem tmp;
+ if ((tmp = runningTasks.get(taskId)) != null) {
+ taskRunnerWorkItem = tmp;
+ } else {
+ final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
+ taskId,
+ announcement.getTaskType(),
+ zkWorker.getWorker(),
+ TaskLocation.unknown(),
+ announcement.getTaskDataSource()
);
-
- log.info(
- "Worker[%s] wrote %s status for task [%s] on [%s]",
- zkWorker.getWorker().getHost(),
- announcement.getTaskStatus().getStatusCode(),
+ final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
- announcement.getTaskLocation()
+ newTaskRunnerWorkItem
);
-
- // Synchronizing state with ZK
- statusLock.notifyAll();
-
- final RemoteTaskRunnerWorkItem tmp;
- if ((tmp = runningTasks.get(taskId)) != null) {
- taskRunnerWorkItem = tmp;
- } else {
- final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
- taskId,
- announcement.getTaskType(),
- zkWorker.getWorker(),
- TaskLocation.unknown(),
- announcement.getTaskDataSource()
- );
- final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
- taskId,
- newTaskRunnerWorkItem
+ if (existingItem == null) {
+ log.warn(
+ "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
+ zkWorker.getWorker().getHost(),
+ taskId
);
- if (existingItem == null) {
- log.warn(
- "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
- zkWorker.getWorker().getHost(),
- taskId
- );
- taskRunnerWorkItem = newTaskRunnerWorkItem;
- } else {
- taskRunnerWorkItem = existingItem;
- }
+ taskRunnerWorkItem = newTaskRunnerWorkItem;
+ } else {
+ taskRunnerWorkItem = existingItem;
}
+ }
- if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
- taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
- TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
- }
+ if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+ taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+ TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
+ }
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
- runPendingTasks();
- }
- break;
- case CHILD_REMOVED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- taskRunnerWorkItem = runningTasks.remove(taskId);
- if (taskRunnerWorkItem != null) {
- log.info("Task[%s] just disappeared!", taskId);
- taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
- } else {
- log.info("Task[%s] went bye bye.", taskId);
- }
- break;
- case INITIALIZED:
- if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
- retVal.set(zkWorker);
- } else {
- final String message = StringUtils.format(
- "WTF?! Tried to add already-existing worker[%s]",
- worker.getHost()
- );
- log.makeAlert(message)
- .addData("workerHost", worker.getHost())
- .addData("workerIp", worker.getIp())
- .emit();
- retVal.setException(new IllegalStateException(message));
- }
+ if (announcement.getTaskStatus().isComplete()) {
+ taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("znode", event.getData().getPath())
- .emit();
+ }
+ break;
+ case CHILD_REMOVED:
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ taskRunnerWorkItem = runningTasks.remove(taskId);
+ if (taskRunnerWorkItem != null) {
+ log.info("Task[%s] just disappeared!", taskId);
+ taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+ } else {
+ log.info("Task[%s] went bye bye.", taskId);
+ }
+ break;
+ case INITIALIZED:
+ if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+ retVal.set(zkWorker);
+ } else {
+ final String message = StringUtils.format(
+ "WTF?! Tried to add already-existing worker[%s]",
+ worker.getHost()
+ );
+ log.makeAlert(message)
+ .addData("workerHost", worker.getHost())
+ .addData("workerIp", worker.getIp())
+ .emit();
+ retVal.setException(new IllegalStateException(message));
+ }
+ runPendingTasks();
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_RECONNECTED:
+ case CONNECTION_LOST:
+ // do nothing
}
}
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("znode", event.getData().getPath())
+ .emit();
+ }
}
}
);
@@ -1113,45 +1089,40 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
cancelWorkerCleanup(worker);
final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
- new Runnable()
- {
- @Override
- public void run()
- {
- log.info("Running scheduled cleanup for Worker[%s]", worker);
- try {
- for (String assignedTask : tasksToFail) {
- String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
- String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
- if (cf.checkExists().forPath(taskPath) != null) {
- cf.delete().guaranteed().forPath(taskPath);
- }
-
- if (cf.checkExists().forPath(statusPath) != null) {
- cf.delete().guaranteed().forPath(statusPath);
- }
+ () -> {
+ log.info("Running scheduled cleanup for Worker[%s]", worker);
+ try {
+ for (String assignedTask : tasksToFail) {
+ String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
+ String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
+ if (cf.checkExists().forPath(taskPath) != null) {
+ cf.delete().guaranteed().forPath(taskPath);
+ }
- log.info("Failing task[%s]", assignedTask);
- RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
- if (taskRunnerWorkItem != null) {
- taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
- TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
- } else {
- log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
- }
+ if (cf.checkExists().forPath(statusPath) != null) {
+ cf.delete().guaranteed().forPath(statusPath);
}
- // worker is gone, remove worker task status announcements path.
- String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
- if (cf.checkExists().forPath(workerStatusPath) != null) {
- cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
+ log.info("Failing task[%s]", assignedTask);
+ RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
+ if (taskRunnerWorkItem != null) {
+ taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
+ TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
+ } else {
+ log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
- catch (Exception e) {
- log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
- throw new RuntimeException(e);
+
+ // worker is gone, remove worker task status announcements path.
+ String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
+ if (cf.checkExists().forPath(workerStatusPath) != null) {
+ cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
}
}
+ catch (Exception e) {
+ log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
+ throw new RuntimeException(e);
+ }
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
@@ -1248,14 +1219,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
- Iterator<String> iterator = zkWorkers.keySet().iterator();
- while (iterator.hasNext()) {
- String worker = iterator.next();
- ZkWorker zkWorker = zkWorkers.get(worker);
+ for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
+ final ZkWorker zkWorker = worker.getValue();
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
- lazyWorkers.put(worker, zkWorker);
+ lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 03ee844..e5fa375 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -236,7 +236,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
cleanupExec,
Period.ZERO.toStandardDuration(),
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
- () -> checkAndRemoveWorkersFromBlackList()
+ this::checkAndRemoveWorkersFromBlackList
);
provisioningService = provisioningStrategy.makeProvisioningService(this);
@@ -329,16 +329,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Maps.transformEntries(
Maps.filterEntries(
workers,
- new Predicate<Map.Entry<String, WorkerHolder>>()
- {
- @Override
- public boolean apply(Map.Entry<String, WorkerHolder> input)
- {
- return !lazyWorkers.containsKey(input.getKey()) &&
- !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
- !blackListedWorkers.containsKey(input.getKey());
- }
- }
+ input -> !lazyWorkers.containsKey(input.getKey()) &&
+ !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.containsKey(input.getKey())
),
(String key, WorkerHolder value) -> value.toImmutable()
)
@@ -566,41 +559,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
cancelWorkerCleanup(workerHostAndPort);
final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
- new Runnable()
- {
- @Override
- public void run()
- {
- log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
- try {
- Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
- synchronized (statusLock) {
- for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
- if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
- Worker w = e.getValue().getWorker();
- if (w != null && w.getHost().equals(workerHostAndPort)) {
- tasksToFail.add(e.getValue());
- }
+ () -> {
+ log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
+ try {
+ Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
+ synchronized (statusLock) {
+ for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
+ if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
+ Worker w = e.getValue().getWorker();
+ if (w != null && w.getHost().equals(workerHostAndPort)) {
+ tasksToFail.add(e.getValue());
}
}
}
+ }
- for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
- if (!taskItem.getResult().isDone()) {
- log.info(
- "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
- workerHostAndPort,
- taskItem.getTaskId(),
- config.getTaskCleanupTimeout()
- );
- taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
- }
+ for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
+ if (!taskItem.getResult().isDone()) {
+ log.info(
+ "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
+ workerHostAndPort,
+ taskItem.getTaskId(),
+ config.getTaskCleanupTimeout()
+ );
+ taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
}
}
- catch (Exception e) {
- log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
- throw new RuntimeException(e);
- }
+ }
+ catch (Exception e) {
+ log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
+ throw new RuntimeException(e);
}
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
@@ -779,14 +767,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
synchronized (statusLock) {
- Iterator<String> iterator = workers.keySet().iterator();
- while (iterator.hasNext()) {
- String worker = iterator.next();
- WorkerHolder workerHolder = workers.get(worker);
+ for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
+ final WorkerHolder workerHolder = worker.getValue();
try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
- lazyWorkers.put(worker, workerHolder);
+ lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
@@ -835,7 +821,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return tasks.values()
.stream()
.filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
- .map(item -> item.getTask())
+ .map(HttpRemoteTaskRunnerWorkItem::getTask)
.collect(Collectors.toList());
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 4872f20..24b4b1d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -115,8 +115,8 @@ public class SupervisorManager
synchronized (lock) {
Map<String, SupervisorSpec> supervisors = metadataSupervisorManager.getLatest();
- for (String id : supervisors.keySet()) {
- SupervisorSpec spec = supervisors.get(id);
+ for (Map.Entry<String, SupervisorSpec> supervisor : supervisors.entrySet()) {
+ final SupervisorSpec spec = supervisor.getValue();
if (!(spec instanceof NoopSupervisorSpec)) {
try {
createAndStartSupervisorInternal(spec, false);
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
index e176734..dd3f880 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
@@ -95,14 +95,14 @@ class LookupListeningResource extends ListenerResource
public Object post(final Map<String, LookupExtractorFactory> lookups)
{
final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
- for (final String name : lookups.keySet()) {
+ for (final Map.Entry<String, LookupExtractorFactory> lookup : lookups.entrySet()) {
final LookupExtractorFactoryContainer factoryContainer = new LookupExtractorFactoryContainer(
null,
- lookups.get(name)
+ lookup.getValue()
);
- manager.add(name, factoryContainer);
+ manager.add(lookup.getKey(), factoryContainer);
}
return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org