You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/12/04 13:13:37 UTC

[druid] branch master updated: Better serverView exec name; remove SingleServerInventoryView (#11770)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f052b4  Better serverView exec name; remove SingleServerInventoryView (#11770)
1f052b4 is described below

commit 1f052b43c5d7b6e0ca578e8c2597e687a6a8ab92
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Sat Dec 4 05:13:05 2021 -0800

    Better serverView exec name; remove SingleServerInventoryView (#11770)
    
    Druid currently has 2 serverViews, regular serverView and filtered serverView. The regular serverView is used to monitor all segment announcements from all data nodes (historicals, tasks, indexers). The filtered serverView is used when you want to watch segment announcements from particular tiers. Since these server views keep track of different sets of druidServers and segments in memory, they should be maintained separately. However, they currently share the same name for their exec [...]
    
    This PR also removes SingleServerInventoryView. This view was deprecated a long time ago and has not been documented at least since 0.13 (#6127). I also don't think this can be better in any case than BatchServerInventoryView. Finally, I merged AbstractCuratorServerInventoryView and BatchServerInventoryView as we no longer need AbstractCuratorServerInventoryView after SingleServerInventoryView is removed.
---
 .../materializedview/DatasourceOptimizerTest.java  |   2 +-
 .../client/AbstractCuratorServerInventoryView.java | 335 ---------------------
 .../druid/client/BatchServerInventoryView.java     | 313 ++++++++++++++++---
 .../client/BatchServerInventoryViewProvider.java   |   3 +-
 .../FilteredBatchServerInventoryViewProvider.java  |   3 +-
 .../FilteredHttpServerInventoryViewProvider.java   |   3 +-
 .../FilteredServerInventoryViewProvider.java       |   1 -
 .../FilteredSingleServerInventoryViewProvider.java |  49 ---
 .../druid/client/FilteringSegmentCallback.java     |  76 +++++
 .../druid/client/HttpServerInventoryView.java      |  25 +-
 .../client/HttpServerInventoryViewProvider.java    |   3 +-
 .../druid/client/ServerInventoryViewProvider.java  |   1 -
 .../client/SingleServerInventoryProvider.java      |  56 ----
 .../druid/client/SingleServerInventoryView.java    | 166 ----------
 .../curator/inventory/CuratorInventoryManager.java |   4 -
 .../apache/druid/client/BrokerServerViewTest.java  |   3 +-
 .../druid/client/CoordinatorServerViewTest.java    |   3 +-
 .../druid/client/HttpServerInventoryViewTest.java  |   3 +-
 .../client/BatchServerInventoryViewTest.java       |   6 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |   3 +-
 .../server/coordinator/DruidCoordinatorTest.java   |   6 +-
 .../calcite/schema/DruidSchemaConcurrencyTest.java |   2 +-
 22 files changed, 386 insertions(+), 680 deletions(-)

diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
index bd070e3..4f0d387 100644
--- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
+++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
@@ -259,7 +259,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
 
   private void setupViews() throws Exception
   {
-    baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue())
+    baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue(), "test")
     {
       @Override
       public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
diff --git a/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java
deleted file mode 100644
index 48d1ace..0000000
--- a/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.druid.curator.inventory.CuratorInventoryManager;
-import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
-import org.apache.druid.curator.inventory.InventoryManagerConfig;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- */
-public abstract class AbstractCuratorServerInventoryView<InventoryType> implements ServerInventoryView
-{
-
-  private final EmittingLogger log;
-  private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
-  private final AtomicBoolean started = new AtomicBoolean(false);
-
-  private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
-  private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
-
-  public AbstractCuratorServerInventoryView(
-      final EmittingLogger log,
-      final String announcementsPath,
-      final String inventoryPath,
-      final CuratorFramework curator,
-      final ObjectMapper jsonMapper,
-      final TypeReference<InventoryType> typeReference
-  )
-  {
-    this.log = log;
-    this.inventoryManager = new CuratorInventoryManager<>(
-        curator,
-        new InventoryManagerConfig()
-        {
-          @Override
-          public String getContainerPath()
-          {
-            return announcementsPath;
-          }
-
-          @Override
-          public String getInventoryPath()
-          {
-            return inventoryPath;
-          }
-        },
-        Execs.singleThreaded("ServerInventoryView-%s"),
-        new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
-        {
-          @Override
-          public DruidServer deserializeContainer(byte[] bytes)
-          {
-            try {
-              return jsonMapper.readValue(bytes, DruidServer.class);
-            }
-            catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-          }
-
-          @Override
-          public InventoryType deserializeInventory(byte[] bytes)
-          {
-            try {
-              return jsonMapper.readValue(bytes, typeReference);
-            }
-            catch (IOException e) {
-              log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
-              throw new RuntimeException(e);
-            }
-          }
-
-          @Override
-          public void newContainer(DruidServer container)
-          {
-            log.info("New Server[%s]", container);
-          }
-
-          @Override
-          public void deadContainer(DruidServer deadContainer)
-          {
-            log.info("Server Disappeared[%s]", deadContainer);
-            runServerRemovedCallbacks(deadContainer);
-          }
-
-          @Override
-          public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
-          {
-            return newContainer.addDataSegments(oldContainer);
-          }
-
-          @Override
-          public DruidServer addInventory(
-              final DruidServer container,
-              String inventoryKey,
-              final InventoryType inventory
-          )
-          {
-            return addInnerInventory(container, inventoryKey, inventory);
-          }
-
-          @Override
-          public DruidServer updateInventory(DruidServer container, String inventoryKey, InventoryType inventory)
-          {
-            return updateInnerInventory(container, inventoryKey, inventory);
-          }
-
-          @Override
-          public DruidServer removeInventory(final DruidServer container, String inventoryKey)
-          {
-            return removeInnerInventory(container, inventoryKey);
-          }
-
-          @Override
-          public void inventoryInitialized()
-          {
-            log.info("Inventory Initialized");
-            runSegmentCallbacks(
-                input -> input.segmentViewInitialized()
-            );
-          }
-        }
-    );
-  }
-
-  @LifecycleStart
-  public void start() throws Exception
-  {
-    synchronized (started) {
-      if (!started.get()) {
-        inventoryManager.start();
-        started.set(true);
-      }
-    }
-  }
-
-  @LifecycleStop
-  public void stop() throws IOException
-  {
-    synchronized (started) {
-      if (started.getAndSet(false)) {
-        inventoryManager.stop();
-      }
-    }
-  }
-
-  @Override
-  public boolean isStarted()
-  {
-    return started.get();
-  }
-
-  @Override
-  public DruidServer getInventoryValue(String containerKey)
-  {
-    return inventoryManager.getInventoryValue(containerKey);
-  }
-
-  @Override
-  public Collection<DruidServer> getInventory()
-  {
-    return inventoryManager.getInventory();
-  }
-
-  @Override
-  public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
-  {
-    serverRemovedCallbacks.put(callback, exec);
-  }
-
-  @Override
-  public void registerSegmentCallback(Executor exec, SegmentCallback callback)
-  {
-    segmentCallbacks.put(callback, exec);
-  }
-
-  public InventoryManagerConfig getInventoryManagerConfig()
-  {
-    return inventoryManager.getConfig();
-  }
-
-  protected void runSegmentCallbacks(
-      final Function<SegmentCallback, CallbackAction> fn
-  )
-  {
-    for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
-      entry.getValue().execute(
-          () -> {
-            if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
-              segmentCallbackRemoved(entry.getKey());
-              segmentCallbacks.remove(entry.getKey());
-            }
-          }
-      );
-    }
-  }
-
-  private void runServerRemovedCallbacks(final DruidServer server)
-  {
-    for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
-      entry.getValue().execute(
-          () -> {
-            if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
-              serverRemovedCallbacks.remove(entry.getKey());
-            }
-          }
-      );
-    }
-  }
-
-  protected void addSingleInventory(final DruidServer container, final DataSegment inventory)
-  {
-    log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId());
-
-    if (container.getSegment(inventory.getId()) != null) {
-      log.warn(
-          "Not adding or running callbacks for existing segment[%s] on server[%s]",
-          inventory.getId(),
-          container.getName()
-      );
-
-      return;
-    }
-
-    container.addDataSegment(inventory);
-
-    runSegmentCallbacks(
-        input -> input.segmentAdded(container.getMetadata(), inventory)
-    );
-  }
-
-  void removeSingleInventory(DruidServer container, SegmentId segmentId)
-  {
-    log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
-    if (!doRemoveSingleInventory(container, segmentId)) {
-      log.warn(
-          "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
-          segmentId,
-          container.getName()
-      );
-    }
-  }
-
-  void removeSingleInventory(final DruidServer container, String segmentId)
-  {
-    log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
-    for (SegmentId possibleSegmentId : SegmentId.iterateAllPossibleParsings(segmentId)) {
-      if (doRemoveSingleInventory(container, possibleSegmentId)) {
-        return;
-      }
-    }
-    log.warn(
-        "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
-        segmentId,
-        container.getName()
-    );
-  }
-
-  private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId)
-  {
-    DataSegment segment = container.removeDataSegment(segmentId);
-    if (segment != null) {
-      runSegmentCallbacks(
-          input -> input.segmentRemoved(container.getMetadata(), segment)
-      );
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
-  {
-    try {
-      DruidServer server = getInventoryValue(serverKey);
-      return server != null && server.getSegment(segment.getId()) != null;
-    }
-    catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  protected abstract DruidServer addInnerInventory(
-      DruidServer container,
-      String inventoryKey,
-      InventoryType inventory
-  );
-
-  protected abstract DruidServer updateInnerInventory(
-      DruidServer container,
-      String inventoryKey,
-      InventoryType inventory
-  );
-
-  protected abstract DruidServer removeInnerInventory(DruidServer container, String inventoryKey);
-
-  protected abstract void segmentCallbackRemoved(SegmentCallback callback);
-}
diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
index d3b254f..00dfbce 100644
--- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
@@ -27,59 +27,295 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.inject.Inject;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.curator.inventory.CuratorInventoryManager;
+import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
+import org.apache.druid.curator.inventory.InventoryManagerConfig;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class is deprecated. Use {@link HttpServerInventoryView} instead.
  */
 @Deprecated
 @ManageLifecycle
-public class BatchServerInventoryView extends AbstractCuratorServerInventoryView<Set<DataSegment>>
-    implements FilteredServerInventoryView
+public class BatchServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
 {
   private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
 
+  private final CuratorInventoryManager<DruidServer, Set<DataSegment>> inventoryManager;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+
+  private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
+
   private final ConcurrentMap<String, Set<DataSegment>> zNodes = new ConcurrentHashMap<>();
   private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
       new ConcurrentHashMap<>();
   private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
 
-  @Inject
   public BatchServerInventoryView(
       final ZkPathsConfig zkPaths,
       final CuratorFramework curator,
       final ObjectMapper jsonMapper,
-      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
+      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
+      final String pathChildrenCacheExecPrefix
   )
   {
-    super(
-        log,
-        zkPaths.getAnnouncementsPath(),
-        zkPaths.getLiveSegmentsPath(),
+    this.inventoryManager = new CuratorInventoryManager<>(
         curator,
-        jsonMapper,
-        new TypeReference<Set<DataSegment>>()
+        new InventoryManagerConfig()
         {
+          @Override
+          public String getContainerPath()
+          {
+            return zkPaths.getAnnouncementsPath();
+          }
+
+          @Override
+          public String getInventoryPath()
+          {
+            return zkPaths.getLiveSegmentsPath();
+          }
+        },
+        Execs.singleThreaded(pathChildrenCacheExecPrefix + "-%s"),
+        new CuratorInventoryManagerStrategy<DruidServer, Set<DataSegment>>()
+        {
+          @Override
+          public DruidServer deserializeContainer(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, DruidServer.class);
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public Set<DataSegment> deserializeInventory(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, new TypeReference<Set<DataSegment>>()
+              {
+              });
+            }
+            catch (IOException e) {
+              log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public void newContainer(DruidServer container)
+          {
+            log.info("New Server[%s]", container);
+          }
+
+          @Override
+          public void deadContainer(DruidServer deadContainer)
+          {
+            log.info("Server Disappeared[%s]", deadContainer);
+            runServerRemovedCallbacks(deadContainer);
+          }
+
+          @Override
+          public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
+          {
+            return newContainer.addDataSegments(oldContainer);
+          }
+
+          @Override
+          public DruidServer addInventory(
+              final DruidServer container,
+              String inventoryKey,
+              final Set<DataSegment> inventory
+          )
+          {
+            return addInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer updateInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
+          {
+            return updateInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer removeInventory(final DruidServer container, String inventoryKey)
+          {
+            return removeInnerInventory(container, inventoryKey);
+          }
+
+          @Override
+          public void inventoryInitialized()
+          {
+            log.info("Inventory Initialized");
+            runSegmentCallbacks(SegmentCallback::segmentViewInitialized);
+          }
         }
     );
 
     this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
   }
 
+  @LifecycleStart
+  public void start() throws Exception
+  {
+    synchronized (started) {
+      if (!started.get()) {
+        inventoryManager.start();
+        started.set(true);
+      }
+    }
+  }
+
+  @LifecycleStop
+  public void stop() throws IOException
+  {
+    synchronized (started) {
+      if (started.getAndSet(false)) {
+        inventoryManager.stop();
+      }
+    }
+  }
+
   @Override
+  public boolean isStarted()
+  {
+    return started.get();
+  }
+
+  @Override
+  public DruidServer getInventoryValue(String containerKey)
+  {
+    return inventoryManager.getInventoryValue(containerKey);
+  }
+
+  @Override
+  public Collection<DruidServer> getInventory()
+  {
+    return inventoryManager.getInventory();
+  }
+
+  @Override
+  public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
+  {
+    serverRemovedCallbacks.put(callback, exec);
+  }
+
+  @Override
+  public void registerSegmentCallback(Executor exec, SegmentCallback callback)
+  {
+    segmentCallbacks.put(callback, exec);
+  }
+
+  protected void runSegmentCallbacks(
+      final Function<SegmentCallback, CallbackAction> fn
+  )
+  {
+    for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
+      entry.getValue().execute(
+          () -> {
+            if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+              segmentCallbackRemoved(entry.getKey());
+              segmentCallbacks.remove(entry.getKey());
+            }
+          }
+      );
+    }
+  }
+
+  private void runServerRemovedCallbacks(final DruidServer server)
+  {
+    for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
+      entry.getValue().execute(
+          () -> {
+            if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
+              serverRemovedCallbacks.remove(entry.getKey());
+            }
+          }
+      );
+    }
+  }
+
+  protected void addSingleInventory(final DruidServer container, final DataSegment inventory)
+  {
+    log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId());
+
+    if (container.getSegment(inventory.getId()) != null) {
+      log.warn(
+          "Not adding or running callbacks for existing segment[%s] on server[%s]",
+          inventory.getId(),
+          container.getName()
+      );
+
+      return;
+    }
+
+    container.addDataSegment(inventory);
+
+    runSegmentCallbacks(
+        input -> input.segmentAdded(container.getMetadata(), inventory)
+    );
+  }
+
+  void removeSingleInventory(DruidServer container, SegmentId segmentId)
+  {
+    log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
+    if (!doRemoveSingleInventory(container, segmentId)) {
+      log.warn(
+          "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
+          segmentId,
+          container.getName()
+      );
+    }
+  }
+
+  private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId)
+  {
+    DataSegment segment = container.removeDataSegment(segmentId);
+    if (segment != null) {
+      runSegmentCallbacks(
+          input -> input.segmentRemoved(container.getMetadata(), segment)
+      );
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
+  {
+    try {
+      DruidServer server = getInventoryValue(serverKey);
+      return server != null && server.getSegment(segment.getId()) != null;
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
   protected DruidServer addInnerInventory(
       final DruidServer container,
       String inventoryKey,
@@ -102,36 +338,35 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
     );
 
     // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
-    Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.transform(
-        Iterables.filter(
-            Iterables.transform(
-                inventory,
-                new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
-                {
-                  @Override
-                  public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
-                  {
-                    return Pair.of(container.getMetadata(), input);
-                  }
-                }
+    Set<DataSegment> filteredInventory = Sets.newHashSet(
+        Iterables.transform(
+            Iterables.filter(
+                Iterables.transform(
+                    inventory,
+                    new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
+                    {
+                      @Override
+                      public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
+                      {
+                        return Pair.of(container.getMetadata(), input);
+                      }
+                    }
+                ),
+                predicate
             ),
-            predicate
-        ),
-        new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
-        {
-          @Override
-          public DataSegment apply(
-              Pair<DruidServerMetadata, DataSegment> input
-          )
-          {
-            return DataSegmentInterner.intern(input.rhs);
-          }
-        }
-    ));
+            new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
+            {
+              @Override
+              public DataSegment apply(Pair<DruidServerMetadata, DataSegment> input)
+              {
+                return DataSegmentInterner.intern(input.rhs);
+              }
+            }
+        )
+    );
     return filteredInventory;
   }
 
-  @Override
   protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
   {
     Set<DataSegment> filteredInventory = filterInventory(container, inventory);
@@ -152,7 +387,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
     return container;
   }
 
-  @Override
   protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
   {
     log.debug("Server[%s] removed container[%s]", container.getName(), inventoryKey);
@@ -176,7 +410,7 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
       final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
   )
   {
-    SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
+    SegmentCallback filteringCallback = new FilteringSegmentCallback(callback, filter);
     segmentPredicates.put(filteringCallback, filter);
     registerSegmentCallback(
         exec,
@@ -184,7 +418,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
     );
   }
 
-  @Override
   protected void segmentCallbackRemoved(SegmentCallback callback)
   {
     segmentPredicates.remove(callback);
diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java
index 1686a2f..9c555d3 100644
--- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java
@@ -50,7 +50,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
         zkPaths,
         curator,
         jsonMapper,
-        Predicates.alwaysTrue()
+        Predicates.alwaysTrue(),
+        "BatchServerInventoryView"
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java
index d8df8d1..bb79693 100644
--- a/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java
@@ -48,7 +48,8 @@ public class FilteredBatchServerInventoryViewProvider implements FilteredServerI
         zkPaths,
         curator,
         jsonMapper,
-        Predicates.alwaysFalse()
+        Predicates.alwaysFalse(),
+        "FilteredBatchServerInventoryView"
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java
index af18fe0..c614a6a 100644
--- a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java
@@ -59,7 +59,8 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
         httpClient,
         druidNodeDiscoveryProvider,
         Predicates.alwaysFalse(),
-        config
+        config,
+        "FilteredHttpServerInventoryView"
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java
index 5ea3ac2..fdbb27e 100644
--- a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java
@@ -26,7 +26,6 @@ import com.google.inject.Provider;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class)
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class),
     @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class),
     @JsonSubTypes.Type(name = "http", value = FilteredHttpServerInventoryViewProvider.class)
 })
diff --git a/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java
deleted file mode 100644
index 4ae88a5..0000000
--- a/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Predicates;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.druid.server.initialization.ZkPathsConfig;
-
-import javax.validation.constraints.NotNull;
-
-public class FilteredSingleServerInventoryViewProvider implements FilteredServerInventoryViewProvider
-{
-  @JacksonInject
-  @NotNull
-  private ZkPathsConfig zkPaths = null;
-
-  @JacksonInject
-  @NotNull
-  private CuratorFramework curator = null;
-
-  @JacksonInject
-  @NotNull
-  private ObjectMapper jsonMapper = null;
-
-  @Override
-  public SingleServerInventoryView get()
-  {
-    return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse());
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java b/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java
new file mode 100644
index 0000000..f2d0a22
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.client.ServerView.CallbackAction;
+import org.apache.druid.client.ServerView.SegmentCallback;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.timeline.DataSegment;
+
+/**
+ * A SegmentCallback that is called only when the given filter satisfies.
+ * {@link  #segmentViewInitialized()} is an exception and always called
+ * when the view is initialized without using the filter.
+ * Callback methods return {@link CallbackAction#CONTINUE} when the filter does not satisfy.
+ */
+public class FilteringSegmentCallback implements SegmentCallback
+{
+
+  private final SegmentCallback callback;
+  private final Predicate<Pair<DruidServerMetadata, DataSegment>> filter;
+
+  public FilteringSegmentCallback(SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter)
+  {
+    this.callback = callback;
+    this.filter = filter;
+  }
+
+  @Override
+  public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
+  {
+    final CallbackAction action;
+    if (filter.apply(Pair.of(server, segment))) {
+      action = callback.segmentAdded(server, segment);
+    } else {
+      action = CallbackAction.CONTINUE;
+    }
+    return action;
+  }
+
+  @Override
+  public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
+  {
+    final CallbackAction action;
+    if (filter.apply(Pair.of(server, segment))) {
+      action = callback.segmentRemoved(server, segment);
+    } else {
+      action = CallbackAction.CONTINUE;
+    }
+    return action;
+  }
+
+  @Override
+  public CallbackAction segmentViewInitialized()
+  {
+    return callback.segmentViewInitialized();
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index edb95c0..9c6f96d 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -28,14 +28,11 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
-import com.google.inject.Inject;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.guice.annotations.EscalatedGlobal;
-import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -104,19 +101,20 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
   // For each queryable server, a name -> DruidServerHolder entry is kept
   private final ConcurrentHashMap<String, DruidServerHolder> servers = new ConcurrentHashMap<>();
 
+  private final String execNamePrefix;
   private volatile ScheduledExecutorService executor;
 
   private final HttpClient httpClient;
   private final ObjectMapper smileMapper;
   private final HttpServerInventoryViewConfig config;
 
-  @Inject
   public HttpServerInventoryView(
-      final @Smile ObjectMapper smileMapper,
-      final @EscalatedGlobal HttpClient httpClient,
+      final ObjectMapper smileMapper,
+      final HttpClient httpClient,
       final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
       final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
-      final HttpServerInventoryViewConfig config
+      final HttpServerInventoryViewConfig config,
+      final String execNamePrefix
   )
   {
     this.httpClient = httpClient;
@@ -125,6 +123,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
     this.defaultFilter = defaultFilter;
     this.finalPredicate = defaultFilter;
     this.config = config;
+    this.execNamePrefix = execNamePrefix;
   }
 
 
@@ -136,12 +135,12 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
         throw new ISE("can't start.");
       }
 
-      log.info("Starting HttpServerInventoryView.");
+      log.info("Starting %s.", execNamePrefix);
 
       try {
         executor = ScheduledExecutors.fixed(
             config.getNumThreads(),
-            "HttpServerInventoryView-%s"
+            execNamePrefix + "-%s"
         );
 
         DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY);
@@ -194,7 +193,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
         lifecycleLock.exitStart();
       }
 
-      log.info("Started HttpServerInventoryView.");
+      log.info("Started %s.", execNamePrefix);
     }
   }
 
@@ -206,13 +205,13 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
         throw new ISE("can't stop.");
       }
 
-      log.info("Stopping HttpServerInventoryView.");
+      log.info("Stopping %s.", execNamePrefix);
 
       if (executor != null) {
         executor.shutdownNow();
       }
 
-      log.info("Stopped HttpServerInventoryView.");
+      log.info("Stopped %s.", execNamePrefix);
     }
   }
 
@@ -227,7 +226,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
       throw new ISE("Lifecycle has already started.");
     }
 
-    SegmentCallback filteringSegmentCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
+    SegmentCallback filteringSegmentCallback = new FilteringSegmentCallback(callback, filter);
     segmentCallbacks.put(filteringSegmentCallback, exec);
     segmentPredicates.put(filteringSegmentCallback, filter);
 
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java
index 9311292..7f008c5 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java
@@ -59,7 +59,8 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
         httpClient,
         druidNodeDiscoveryProvider,
         Predicates.alwaysTrue(),
-        config
+        config,
+        "HttpServerInventoryView"
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java
index ad34f70..bc87500 100644
--- a/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java
+++ b/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java
@@ -27,7 +27,6 @@ import com.google.inject.Provider;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class)
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class),
     @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class),
     @JsonSubTypes.Type(name = "http", value = HttpServerInventoryViewProvider.class),
 })
diff --git a/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java
deleted file mode 100644
index 3e2ad48..0000000
--- a/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Predicates;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.druid.server.initialization.ZkPathsConfig;
-
-import javax.validation.constraints.NotNull;
-
-/**
- */
-public class SingleServerInventoryProvider implements ServerInventoryViewProvider
-{
-  @JacksonInject
-  @NotNull
-  private ZkPathsConfig zkPaths = null;
-
-  @JacksonInject
-  @NotNull
-  private CuratorFramework curator = null;
-
-  @JacksonInject
-  @NotNull
-  private ObjectMapper jsonMapper = null;
-
-  @Override
-  public ServerInventoryView get()
-  {
-    return new SingleServerInventoryView(
-        zkPaths,
-        curator,
-        jsonMapper,
-        Predicates.alwaysTrue()
-    );
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java b/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java
deleted file mode 100644
index 0b59137..0000000
--- a/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.inject.Inject;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.initialization.ZkPathsConfig;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-
-/**
- * This class is deprecated. Use {@link HttpServerInventoryView} instead.
- */
-@Deprecated
-@ManageLifecycle
-public class SingleServerInventoryView extends AbstractCuratorServerInventoryView<DataSegment> implements FilteredServerInventoryView
-{
-  private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
-
-  private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
-      new ConcurrentHashMap<>();
-  private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
-
-  @Inject
-  public SingleServerInventoryView(
-      final ZkPathsConfig zkPaths,
-      final CuratorFramework curator,
-      final ObjectMapper jsonMapper,
-      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
-  )
-  {
-    super(
-        log,
-        zkPaths.getAnnouncementsPath(),
-        zkPaths.getServedSegmentsPath(),
-        curator,
-        jsonMapper,
-        new TypeReference<DataSegment>()
-        {
-        }
-    );
-
-    Preconditions.checkNotNull(defaultFilter);
-    this.defaultFilter = defaultFilter;
-  }
-
-  @Override
-  protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory)
-  {
-    Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
-        defaultFilter,
-        Predicates.or(segmentPredicates.values())
-    );
-    if (predicate.apply(Pair.of(container.getMetadata(), inventory))) {
-      addSingleInventory(container, DataSegmentInterner.intern(inventory));
-    }
-    return container;
-  }
-
-  @Override
-  protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory)
-  {
-    return addInnerInventory(container, inventoryKey, inventory);
-  }
-
-  @Override
-  protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
-  {
-    removeSingleInventory(container, inventoryKey);
-    return container;
-  }
-
-  @Override
-  public void registerSegmentCallback(
-      final Executor exec,
-      final SegmentCallback callback,
-      final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
-  )
-  {
-    SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
-    segmentPredicates.put(filteringCallback, filter);
-    registerSegmentCallback(
-        exec,
-        filteringCallback
-    );
-  }
-
-  @Override
-  protected void segmentCallbackRemoved(SegmentCallback callback)
-  {
-    segmentPredicates.remove(callback);
-  }
-
-  public static class FilteringSegmentCallback implements SegmentCallback
-  {
-
-    private final SegmentCallback callback;
-    private final Predicate<Pair<DruidServerMetadata, DataSegment>> filter;
-
-    public FilteringSegmentCallback(SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter)
-    {
-      this.callback = callback;
-      this.filter = filter;
-    }
-
-    @Override
-    public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
-    {
-      final CallbackAction action;
-      if (filter.apply(Pair.of(server, segment))) {
-        action = callback.segmentAdded(server, segment);
-      } else {
-        action = CallbackAction.CONTINUE;
-      }
-      return action;
-    }
-
-    @Override
-    public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
-    {
-      final CallbackAction action;
-      if (filter.apply(Pair.of(server, segment))) {
-        action = callback.segmentRemoved(server, segment);
-      } else {
-        action = CallbackAction.CONTINUE;
-      }
-      return action;
-    }
-
-    @Override
-    public CallbackAction segmentViewInitialized()
-    {
-      return callback.segmentViewInitialized();
-    }
-  }
-
-}
diff --git a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
index 88c9d8f..5df07e9 100644
--- a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
+++ b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
@@ -29,8 +29,6 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.curator.cache.PathChildrenCacheFactory;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.annotation.Nullable;
@@ -100,7 +98,6 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
         .build();
   }
 
-  @LifecycleStart
   public void start() throws Exception
   {
     PathChildrenCache childrenCache;
@@ -131,7 +128,6 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
     }
   }
 
-  @LifecycleStop
   public void stop() throws IOException
   {
     synchronized (lock) {
diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 0a0e6b8..5855c9d 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -599,7 +599,8 @@ public class BrokerServerViewTest extends CuratorTestBase
         zkPathsConfig,
         curator,
         jsonMapper,
-        Predicates.alwaysTrue()
+        Predicates.alwaysTrue(),
+        "test"
     )
     {
       @Override
diff --git a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java
index 961303c..5a36fec 100644
--- a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java
@@ -292,7 +292,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase
         zkPathsConfig,
         curator,
         jsonMapper,
-        Predicates.alwaysTrue()
+        Predicates.alwaysTrue(),
+        "test"
     )
     {
       @Override
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index f0fd54a..896f90d 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -182,7 +182,8 @@ public class HttpServerInventoryViewTest
         httpClient,
         druidNodeDiscoveryProvider,
         (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"),
-        new HttpServerInventoryViewConfig(null, null, null)
+        new HttpServerInventoryViewConfig(null, null, null),
+        "test"
     );
 
     CountDownLatch initializeCallback1 = new CountDownLatch(1);
diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
index 9ba4466..25ac1ba 100644
--- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
@@ -181,7 +181,8 @@ public class BatchServerInventoryViewTest
         },
         cf,
         jsonMapper,
-        Predicates.alwaysTrue()
+        Predicates.alwaysTrue(),
+        "test"
     );
 
     batchServerInventoryView.start();
@@ -204,7 +205,8 @@ public class BatchServerInventoryViewTest
           {
             return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
           }
-        }
+        },
+        "test"
     )
     {
       @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index f830408..3adbbd6 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -476,7 +476,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         zkPathsConfig,
         curator,
         jsonMapper,
-        Predicates.alwaysTrue()
+        Predicates.alwaysTrue(),
+        "test"
     )
     {
       @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 0a2db2c..ec0201b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -29,12 +29,12 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.druid.client.BatchServerInventoryView;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.client.SingleServerInventoryView;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.CuratorTestBase;
 import org.apache.druid.curator.CuratorUtils;
@@ -95,7 +95,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private DataSourcesSnapshot dataSourcesSnapshot;
   private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
 
-  private SingleServerInventoryView serverInventoryView;
+  private BatchServerInventoryView serverInventoryView;
   private ScheduledExecutorFactory scheduledExecutorFactory;
   private DruidServer druidServer;
   private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
@@ -113,7 +113,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   public void setUp() throws Exception
   {
     druidServer = EasyMock.createMock(DruidServer.class);
-    serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
+    serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
     segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
     dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
index 6f0d0b4..67a626d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
@@ -29,10 +29,10 @@ import org.apache.druid.client.BrokerSegmentWatcherConfig;
 import org.apache.druid.client.BrokerServerView;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.FilteredServerInventoryView;
+import org.apache.druid.client.FilteringSegmentCallback;
 import org.apache.druid.client.ServerView.CallbackAction;
 import org.apache.druid.client.ServerView.SegmentCallback;
 import org.apache.druid.client.ServerView.ServerRemovedCallback;
-import org.apache.druid.client.SingleServerInventoryView.FilteringSegmentCallback;
 import org.apache.druid.client.TimelineServerView.TimelineCallback;
 import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import org.apache.druid.client.selector.RandomServerSelectorStrategy;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org