You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/07/30 16:52:03 UTC

[incubator-druid] branch master updated: Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e016995  Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)
e016995 is described below

commit e016995d1fb6085a48e2e0d0a28504ed955f0293
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Tue Jul 30 18:51:53 2019 +0200

    Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)
    
    * WMI_WRONG_MAP_ITERATOR
    
    * Fixed missing loop
---
 codestyle/spotbugs-exclude.xml                     |   1 -
 .../apache/druid/java/util/metrics/KeyedDiff.java  |  10 +-
 .../MaterializedViewSupervisor.java                |  14 +-
 .../DataSourceOptimizerMonitor.java                |   8 +-
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 503 ++++++++++-----------
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  78 ++--
 .../overlord/supervisor/SupervisorManager.java     |   4 +-
 .../query/lookup/LookupListeningResource.java      |   6 +-
 8 files changed, 287 insertions(+), 337 deletions(-)

diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index eb112b2..ca9fb08 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -88,5 +88,4 @@
     <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
     <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
     <Bug pattern="URF_UNREAD_FIELD"/>
-    <Bug pattern="WMI_WRONG_MAP_ITERATOR"/>
 </FindBugsFilter>
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java b/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
index e4bfd04..c1a3bae 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/KeyedDiff.java
@@ -41,12 +41,12 @@ public class KeyedDiff
     }
   }
 
-  public static Map<String, Long> subtract(Map<String, Long> xs, Map<String, Long> ys)
+  public static Map<String, Long> subtract(Map<String, Long> lhs, Map<String, Long> rhs)
   {
-    assert xs.keySet().equals(ys.keySet());
-    final Map<String, Long> zs = new HashMap<String, Long>();
-    for (String k : xs.keySet()) {
-      zs.put(k, xs.get(k) - ys.get(k));
+    assert lhs.keySet().equals(rhs.keySet());
+    final Map<String, Long> zs = new HashMap<>();
+    for (Map.Entry<String, Long> k : lhs.entrySet()) {
+      zs.put(k.getKey(), k.getValue() - rhs.get(k.getKey()));
     }
     return zs;
   }
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 1ed80ca..378ed50 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -371,16 +371,12 @@ public class MaterializedViewSupervisor implements Supervisor
     Map<Interval, String> toDropInterval = new HashMap<>(difference.entriesOnlyOnRight());
     // if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
     // if some intervals are in running tasks, but the versions are different, stop the task. 
-    for (Interval interval : runningVersion.keySet()) {
-      if (toBuildInterval.containsKey(interval)
-          && toBuildInterval.get(interval).equals(runningVersion.get(interval))
-          ) {
+    for (Map.Entry<Interval, String> version : runningVersion.entrySet()) {
+      final Interval interval = version.getKey();
+      final String host = version.getValue();
+      if (toBuildInterval.containsKey(interval) && toBuildInterval.get(interval).equals(host)) {
         toBuildInterval.remove(interval);
-
-      } else if (
-          toBuildInterval.containsKey(interval)
-          && !toBuildInterval.get(interval).equals(runningVersion.get(interval))
-      ) {
+      } else if (toBuildInterval.containsKey(interval) && !toBuildInterval.get(interval).equals(host)) {
         if (taskMaster.getTaskQueue().isPresent()) {
           taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch");
           runningTasks.remove(interval);
diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
index 0720957..08b0b25 100644
--- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
+++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizerMonitor.java
@@ -41,7 +41,7 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
   @Override
   public boolean doMonitor(ServiceEmitter emitter) 
   {
-    List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
+    final List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
     for (DataSourceOptimizerStats stat : stats) {
       final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
       builder.setDimension("dataSource", stat.getBase());
@@ -50,9 +50,9 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
       emitter.emit(builder.build("/materialized/view/query/hitRate", stat.getHitRate()));
       emitter.emit(builder.build("/materialized/view/select/avgCostMS", stat.getOptimizerCost()));
       Map<String, Long> derivativesStats = stat.getDerivativesHitCount();
-      for (String derivative : derivativesStats.keySet()) {
-        builder.setDimension("derivative", derivative);
-        emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivativesStats.get(derivative)));
+      for (Map.Entry<String, Long> derivative : derivativesStats.entrySet()) {
+        builder.setDimension("derivative", derivative.getKey());
+        emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivative.getValue()));
       }
       final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder();
       builder2.setDimension("dataSource", stat.getBase());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 65ffba7..aa9a582 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -45,8 +45,6 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.curator.CuratorUtils;
@@ -226,97 +224,92 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
       // Add listener for creation/deletion of workers
       workerPathCache.getListenable().addListener(
-          new PathChildrenCacheListener()
-          {
-            @Override
-            public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
-            {
-              final Worker worker;
-              switch (event.getType()) {
-                case CHILD_ADDED:
-                  worker = jsonMapper.readValue(
-                      event.getData().getData(),
-                      Worker.class
-                  );
-                  synchronized (waitingForMonitor) {
-                    waitingFor.increment();
-                  }
-                  Futures.addCallback(
-                      addWorker(worker),
-                      new FutureCallback<ZkWorker>()
+          (client, event) -> {
+            final Worker worker;
+            switch (event.getType()) {
+              case CHILD_ADDED:
+                worker = jsonMapper.readValue(
+                    event.getData().getData(),
+                    Worker.class
+                );
+                synchronized (waitingForMonitor) {
+                  waitingFor.increment();
+                }
+                Futures.addCallback(
+                    addWorker(worker),
+                    new FutureCallback<ZkWorker>()
+                    {
+                      @Override
+                      public void onSuccess(ZkWorker zkWorker)
                       {
-                        @Override
-                        public void onSuccess(ZkWorker zkWorker)
-                        {
-                          synchronized (waitingForMonitor) {
-                            waitingFor.decrement();
-                            waitingForMonitor.notifyAll();
-                          }
+                        synchronized (waitingForMonitor) {
+                          waitingFor.decrement();
+                          waitingForMonitor.notifyAll();
                         }
+                      }
 
-                        @Override
-                        public void onFailure(Throwable throwable)
-                        {
-                          synchronized (waitingForMonitor) {
-                            waitingFor.decrement();
-                            waitingForMonitor.notifyAll();
-                          }
+                      @Override
+                      public void onFailure(Throwable throwable)
+                      {
+                        synchronized (waitingForMonitor) {
+                          waitingFor.decrement();
+                          waitingForMonitor.notifyAll();
                         }
                       }
-                  );
-                  break;
-                case CHILD_UPDATED:
-                  worker = jsonMapper.readValue(
-                      event.getData().getData(),
-                      Worker.class
-                  );
-                  updateWorker(worker);
-                  break;
-
-                case CHILD_REMOVED:
-                  worker = jsonMapper.readValue(
-                      event.getData().getData(),
-                      Worker.class
-                  );
-                  removeWorker(worker);
-                  break;
-                case INITIALIZED:
-                  // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
-                  List<String> workers;
-                  try {
-                    workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
-                  }
-                  catch (KeeperException.NoNodeException e) {
-                    // statusPath doesn't exist yet; can occur if no middleManagers have started.
-                    workers = ImmutableList.of();
-                  }
-                  for (String workerId : workers) {
-                    final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
-                    final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
-                    if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
-                      try {
-                        scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
-                      }
-                      catch (Exception e) {
-                        log.warn(
-                            e,
-                            "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
-                            workerId,
-                            workerStatusPath
-                        );
-                      }
+                    }
+                );
+                break;
+              case CHILD_UPDATED:
+                worker = jsonMapper.readValue(
+                    event.getData().getData(),
+                    Worker.class
+                );
+                updateWorker(worker);
+                break;
+
+              case CHILD_REMOVED:
+                worker = jsonMapper.readValue(
+                    event.getData().getData(),
+                    Worker.class
+                );
+                removeWorker(worker);
+                break;
+              case INITIALIZED:
+                // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
+                List<String> workers;
+                try {
+                  workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
+                }
+                catch (KeeperException.NoNodeException e) {
+                  // statusPath doesn't exist yet; can occur if no middleManagers have started.
+                  workers = ImmutableList.of();
+                }
+                for (String workerId : workers) {
+                  final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
+                  final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
+                  if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
+                    try {
+                      scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
+                    }
+                    catch (Exception e) {
+                      log.warn(
+                          e,
+                          "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
+                          workerId,
+                          workerStatusPath
+                      );
                     }
                   }
-                  synchronized (waitingForMonitor) {
-                    waitingFor.decrement();
-                    waitingForMonitor.notifyAll();
-                  }
-                  break;
-                case CONNECTION_SUSPENDED:
-                case CONNECTION_RECONNECTED:
-                case CONNECTION_LOST:
-                  // do nothing
-              }
+                }
+                synchronized (waitingForMonitor) {
+                  waitingFor.decrement();
+                  waitingForMonitor.notifyAll();
+                }
+                break;
+              case CONNECTION_SUSPENDED:
+              case CONNECTION_RECONNECTED:
+              case CONNECTION_LOST:
+                // do nothing
             }
           }
       );
@@ -331,7 +324,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
           cleanupExec,
           Period.ZERO.toStandardDuration(),
           config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
-          () -> checkBlackListedNodes()
+          this::checkBlackListedNodes
       );
 
       provisioningService = provisioningStrategy.makeProvisioningService(this);
@@ -657,50 +650,45 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   private void runPendingTasks()
   {
     runPendingTasksExec.submit(
-        new Callable<Void>()
-        {
-          @Override
-          public Void call()
-          {
-            try {
-              // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
-              // into running status
-              List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
-              sortByInsertionTime(copy);
-
-              for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
-                String taskId = taskRunnerWorkItem.getTaskId();
-                if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
-                  try {
-                    //this can still be null due to race from explicit task shutdown request
-                    //or if another thread steals and completes this task right after this thread makes copy
-                    //of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
-                    Task task = pendingTaskPayloads.get(taskId);
-                    if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
-                      pendingTaskPayloads.remove(taskId);
-                    }
-                  }
-                  catch (Exception e) {
-                    log.makeAlert(e, "Exception while trying to assign task")
-                       .addData("taskId", taskRunnerWorkItem.getTaskId())
-                       .emit();
-                    RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
-                    if (workItem != null) {
-                      taskComplete(workItem, null, TaskStatus.failure(taskId));
-                    }
+        (Callable<Void>) () -> {
+          try {
+            // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
+            // into running status
+            List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
+            sortByInsertionTime(copy);
+
+            for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
+              String taskId = taskRunnerWorkItem.getTaskId();
+              if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
+                try {
+                  //this can still be null due to race from explicit task shutdown request
+                  //or if another thread steals and completes this task right after this thread makes copy
+                  //of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
+                  Task task = pendingTaskPayloads.get(taskId);
+                  if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
+                    pendingTaskPayloads.remove(taskId);
                   }
-                  finally {
-                    tryAssignTasks.remove(taskId);
+                }
+                catch (Exception e) {
+                  log.makeAlert(e, "Exception while trying to assign task")
+                     .addData("taskId", taskRunnerWorkItem.getTaskId())
+                     .emit();
+                  RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
+                  if (workItem != null) {
+                    taskComplete(workItem, null, TaskStatus.failure(taskId));
                   }
                 }
+                finally {
+                  tryAssignTasks.remove(taskId);
+                }
               }
             }
-            catch (Exception e) {
-              log.makeAlert(e, "Exception in running pending tasks").emit();
-            }
-
-            return null;
           }
+          catch (Exception e) {
+            log.makeAlert(e, "Exception in running pending tasks").emit();
+          }
+
+          return null;
         }
     );
   }
@@ -782,16 +770,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
                   Maps.transformEntries(
                       Maps.filterEntries(
                           zkWorkers,
-                          new Predicate<Map.Entry<String, ZkWorker>>()
-                          {
-                            @Override
-                            public boolean apply(Map.Entry<String, ZkWorker> input)
-                            {
-                              return !lazyWorkers.containsKey(input.getKey()) &&
-                                     !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
-                                     !blackListedWorkers.contains(input.getValue());
-                            }
-                          }
+                          input -> !lazyWorkers.containsKey(input.getKey()) &&
+                                 !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+                                 !blackListedWorkers.contains(input.getValue())
                       ),
                       (String key, ZkWorker value) -> value.toImmutable()
                   )
@@ -935,111 +916,106 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
       // Add status listener to the watcher for status changes
       zkWorker.addListener(
-          new PathChildrenCacheListener()
-          {
-            @Override
-            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
-            {
-              final String taskId;
-              final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
-              synchronized (statusLock) {
-                try {
-                  switch (event.getType()) {
-                    case CHILD_ADDED:
-                    case CHILD_UPDATED:
-                      taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
-                      final TaskAnnouncement announcement = jsonMapper.readValue(
-                          event.getData().getData(), TaskAnnouncement.class
+          (client, event) -> {
+            final String taskId;
+            final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+            synchronized (statusLock) {
+              try {
+                switch (event.getType()) {
+                  case CHILD_ADDED:
+                  case CHILD_UPDATED:
+                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+                    final TaskAnnouncement announcement = jsonMapper.readValue(
+                        event.getData().getData(), TaskAnnouncement.class
+                    );
+
+                    log.info(
+                        "Worker[%s] wrote %s status for task [%s] on [%s]",
+                        zkWorker.getWorker().getHost(),
+                        announcement.getTaskStatus().getStatusCode(),
+                        taskId,
+                        announcement.getTaskLocation()
+                    );
+
+                    // Synchronizing state with ZK
+                    statusLock.notifyAll();
+
+                    final RemoteTaskRunnerWorkItem tmp;
+                    if ((tmp = runningTasks.get(taskId)) != null) {
+                      taskRunnerWorkItem = tmp;
+                    } else {
+                      final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
+                          taskId,
+                          announcement.getTaskType(),
+                          zkWorker.getWorker(),
+                          TaskLocation.unknown(),
+                          announcement.getTaskDataSource()
                       );
-
-                      log.info(
-                          "Worker[%s] wrote %s status for task [%s] on [%s]",
-                          zkWorker.getWorker().getHost(),
-                          announcement.getTaskStatus().getStatusCode(),
+                      final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
                           taskId,
-                          announcement.getTaskLocation()
+                          newTaskRunnerWorkItem
                       );
-
-                      // Synchronizing state with ZK
-                      statusLock.notifyAll();
-
-                      final RemoteTaskRunnerWorkItem tmp;
-                      if ((tmp = runningTasks.get(taskId)) != null) {
-                        taskRunnerWorkItem = tmp;
-                      } else {
-                        final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
-                            taskId,
-                            announcement.getTaskType(),
-                            zkWorker.getWorker(),
-                            TaskLocation.unknown(),
-                            announcement.getTaskDataSource()
-                        );
-                        final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
-                            taskId,
-                            newTaskRunnerWorkItem
+                      if (existingItem == null) {
+                        log.warn(
+                            "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
+                            zkWorker.getWorker().getHost(),
+                            taskId
                         );
-                        if (existingItem == null) {
-                          log.warn(
-                              "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
-                              zkWorker.getWorker().getHost(),
-                              taskId
-                          );
-                          taskRunnerWorkItem = newTaskRunnerWorkItem;
-                        } else {
-                          taskRunnerWorkItem = existingItem;
-                        }
+                        taskRunnerWorkItem = newTaskRunnerWorkItem;
+                      } else {
+                        taskRunnerWorkItem = existingItem;
                       }
+                    }
 
-                      if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
-                        taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
-                        TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
-                      }
+                    if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+                      taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+                      TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
+                    }
 
-                      if (announcement.getTaskStatus().isComplete()) {
-                        taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
-                        runPendingTasks();
-                      }
-                      break;
-                    case CHILD_REMOVED:
-                      taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
-                      taskRunnerWorkItem = runningTasks.remove(taskId);
-                      if (taskRunnerWorkItem != null) {
-                        log.info("Task[%s] just disappeared!", taskId);
-                        taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
-                        TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
-                      } else {
-                        log.info("Task[%s] went bye bye.", taskId);
-                      }
-                      break;
-                    case INITIALIZED:
-                      if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
-                        retVal.set(zkWorker);
-                      } else {
-                        final String message = StringUtils.format(
-                            "WTF?! Tried to add already-existing worker[%s]",
-                            worker.getHost()
-                        );
-                        log.makeAlert(message)
-                           .addData("workerHost", worker.getHost())
-                           .addData("workerIp", worker.getIp())
-                           .emit();
-                        retVal.setException(new IllegalStateException(message));
-                      }
+                    if (announcement.getTaskStatus().isComplete()) {
+                      taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
                       runPendingTasks();
-                      break;
-                    case CONNECTION_SUSPENDED:
-                    case CONNECTION_RECONNECTED:
-                    case CONNECTION_LOST:
-                      // do nothing
-                  }
-                }
-                catch (Exception e) {
-                  log.makeAlert(e, "Failed to handle new worker status")
-                     .addData("worker", zkWorker.getWorker().getHost())
-                     .addData("znode", event.getData().getPath())
-                     .emit();
+                    }
+                    break;
+                  case CHILD_REMOVED:
+                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+                    taskRunnerWorkItem = runningTasks.remove(taskId);
+                    if (taskRunnerWorkItem != null) {
+                      log.info("Task[%s] just disappeared!", taskId);
+                      taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+                      TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+                    } else {
+                      log.info("Task[%s] went bye bye.", taskId);
+                    }
+                    break;
+                  case INITIALIZED:
+                    if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+                      retVal.set(zkWorker);
+                    } else {
+                      final String message = StringUtils.format(
+                          "WTF?! Tried to add already-existing worker[%s]",
+                          worker.getHost()
+                      );
+                      log.makeAlert(message)
+                         .addData("workerHost", worker.getHost())
+                         .addData("workerIp", worker.getIp())
+                         .emit();
+                      retVal.setException(new IllegalStateException(message));
+                    }
+                    runPendingTasks();
+                    break;
+                  case CONNECTION_SUSPENDED:
+                  case CONNECTION_RECONNECTED:
+                  case CONNECTION_LOST:
+                    // do nothing
                 }
               }
+              catch (Exception e) {
+                log.makeAlert(e, "Failed to handle new worker status")
+                   .addData("worker", zkWorker.getWorker().getHost())
+                   .addData("znode", event.getData().getPath())
+                   .emit();
+              }
             }
           }
       );
@@ -1113,45 +1089,40 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     cancelWorkerCleanup(worker);
 
     final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            log.info("Running scheduled cleanup for Worker[%s]", worker);
-            try {
-              for (String assignedTask : tasksToFail) {
-                String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
-                String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
-                if (cf.checkExists().forPath(taskPath) != null) {
-                  cf.delete().guaranteed().forPath(taskPath);
-                }
-
-                if (cf.checkExists().forPath(statusPath) != null) {
-                  cf.delete().guaranteed().forPath(statusPath);
-                }
+        () -> {
+          log.info("Running scheduled cleanup for Worker[%s]", worker);
+          try {
+            for (String assignedTask : tasksToFail) {
+              String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
+              String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
+              if (cf.checkExists().forPath(taskPath) != null) {
+                cf.delete().guaranteed().forPath(taskPath);
+              }
 
-                log.info("Failing task[%s]", assignedTask);
-                RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
-                if (taskRunnerWorkItem != null) {
-                  taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
-                  TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
-                } else {
-                  log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
-                }
+              if (cf.checkExists().forPath(statusPath) != null) {
+                cf.delete().guaranteed().forPath(statusPath);
               }
 
-              // worker is gone, remove worker task status announcements path.
-              String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
-              if (cf.checkExists().forPath(workerStatusPath) != null) {
-                cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
+              log.info("Failing task[%s]", assignedTask);
+              RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
+              if (taskRunnerWorkItem != null) {
+                taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
+                TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
+              } else {
+                log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
               }
             }
-            catch (Exception e) {
-              log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
-              throw new RuntimeException(e);
+
+            // worker is gone, remove worker task status announcements path.
+            String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
+            if (cf.checkExists().forPath(workerStatusPath) != null) {
+              cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
             }
           }
+          catch (Exception e) {
+            log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
+            throw new RuntimeException(e);
+          }
         },
         config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
         TimeUnit.MILLISECONDS
@@ -1248,14 +1219,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
     // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
     synchronized (statusLock) {
-      Iterator<String> iterator = zkWorkers.keySet().iterator();
-      while (iterator.hasNext()) {
-        String worker = iterator.next();
-        ZkWorker zkWorker = zkWorkers.get(worker);
+      for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
+        final ZkWorker zkWorker = worker.getValue();
         try {
           if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
             log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
-            lazyWorkers.put(worker, zkWorker);
+            lazyWorkers.put(worker.getKey(), zkWorker);
             if (lazyWorkers.size() == maxWorkers) {
               // only mark excess workers as lazy and allow their cleanup
               break;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 03ee844..e5fa375 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -236,7 +236,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
           cleanupExec,
           Period.ZERO.toStandardDuration(),
           config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
-          () -> checkAndRemoveWorkersFromBlackList()
+          this::checkAndRemoveWorkersFromBlackList
       );
 
       provisioningService = provisioningStrategy.makeProvisioningService(this);
@@ -329,16 +329,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
             Maps.transformEntries(
                 Maps.filterEntries(
                     workers,
-                    new Predicate<Map.Entry<String, WorkerHolder>>()
-                    {
-                      @Override
-                      public boolean apply(Map.Entry<String, WorkerHolder> input)
-                      {
-                        return !lazyWorkers.containsKey(input.getKey()) &&
-                               !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
-                               !blackListedWorkers.containsKey(input.getKey());
-                      }
-                    }
+                    input -> !lazyWorkers.containsKey(input.getKey()) &&
+                           !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+                           !blackListedWorkers.containsKey(input.getKey())
                 ),
                 (String key, WorkerHolder value) -> value.toImmutable()
             )
@@ -566,41 +559,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     cancelWorkerCleanup(workerHostAndPort);
 
     final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
-            try {
-              Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
-              synchronized (statusLock) {
-                for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
-                  if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
-                    Worker w = e.getValue().getWorker();
-                    if (w != null && w.getHost().equals(workerHostAndPort)) {
-                      tasksToFail.add(e.getValue());
-                    }
+        () -> {
+          log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
+          try {
+            Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
+            synchronized (statusLock) {
+              for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
+                if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
+                  Worker w = e.getValue().getWorker();
+                  if (w != null && w.getHost().equals(workerHostAndPort)) {
+                    tasksToFail.add(e.getValue());
                   }
                 }
               }
+            }
 
-              for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
-                if (!taskItem.getResult().isDone()) {
-                  log.info(
-                      "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
-                      workerHostAndPort,
-                      taskItem.getTaskId(),
-                      config.getTaskCleanupTimeout()
-                  );
-                  taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
-                }
+            for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
+              if (!taskItem.getResult().isDone()) {
+                log.info(
+                    "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
+                    workerHostAndPort,
+                    taskItem.getTaskId(),
+                    config.getTaskCleanupTimeout()
+                );
+                taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
               }
             }
-            catch (Exception e) {
-              log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
-              throw new RuntimeException(e);
-            }
+          }
+          catch (Exception e) {
+            log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
+            throw new RuntimeException(e);
           }
         },
         config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
@@ -779,14 +767,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
   {
     synchronized (statusLock) {
-      Iterator<String> iterator = workers.keySet().iterator();
-      while (iterator.hasNext()) {
-        String worker = iterator.next();
-        WorkerHolder workerHolder = workers.get(worker);
+      for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
+        final WorkerHolder workerHolder = worker.getValue();
         try {
           if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
             log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
-            lazyWorkers.put(worker, workerHolder);
+            lazyWorkers.put(worker.getKey(), workerHolder);
             if (lazyWorkers.size() == maxWorkers) {
               // only mark excess workers as lazy and allow their cleanup
               break;
@@ -835,7 +821,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
       return tasks.values()
                   .stream()
                   .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
-                  .map(item -> item.getTask())
+                  .map(HttpRemoteTaskRunnerWorkItem::getTask)
                   .collect(Collectors.toList());
     }
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 4872f20..24b4b1d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -115,8 +115,8 @@ public class SupervisorManager
 
     synchronized (lock) {
       Map<String, SupervisorSpec> supervisors = metadataSupervisorManager.getLatest();
-      for (String id : supervisors.keySet()) {
-        SupervisorSpec spec = supervisors.get(id);
+      for (Map.Entry<String, SupervisorSpec> supervisor : supervisors.entrySet()) {
+        final SupervisorSpec spec = supervisor.getValue();
         if (!(spec instanceof NoopSupervisorSpec)) {
           try {
             createAndStartSupervisorInternal(spec, false);
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
index e176734..dd3f880 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
@@ -95,14 +95,14 @@ class LookupListeningResource extends ListenerResource
           public Object post(final Map<String, LookupExtractorFactory> lookups)
           {
             final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
-            for (final String name : lookups.keySet()) {
+            for (final Map.Entry<String, LookupExtractorFactory> lookup : lookups.entrySet()) {
 
               final LookupExtractorFactoryContainer factoryContainer = new LookupExtractorFactoryContainer(
                   null,
-                  lookups.get(name)
+                  lookup.getValue()
               );
 
-              manager.add(name, factoryContainer);
+              manager.add(lookup.getKey(), factoryContainer);
             }
             return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org