You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/04 01:23:23 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11770: Better serverView exec name; remove SingleServerInventoryView

jihoonson commented on a change in pull request #11770:
URL: https://github.com/apache/druid/pull/11770#discussion_r762366132



##########
File path: server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
##########
@@ -27,59 +27,295 @@
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.inject.Inject;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.curator.inventory.CuratorInventoryManager;
+import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
+import org.apache.druid.curator.inventory.InventoryManagerConfig;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class is deprecated. Use {@link HttpServerInventoryView} instead.
  */
 @Deprecated
 @ManageLifecycle
-public class BatchServerInventoryView extends AbstractCuratorServerInventoryView<Set<DataSegment>>
-    implements FilteredServerInventoryView
+public class BatchServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
 {
   private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
 
+  private final CuratorInventoryManager<DruidServer, Set<DataSegment>> inventoryManager;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+
+  private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
+
   private final ConcurrentMap<String, Set<DataSegment>> zNodes = new ConcurrentHashMap<>();
   private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
       new ConcurrentHashMap<>();
   private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
 
-  @Inject
   public BatchServerInventoryView(
       final ZkPathsConfig zkPaths,
       final CuratorFramework curator,
       final ObjectMapper jsonMapper,
-      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
+      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
+      final String pathChildrenCacheExecPrefix
   )
   {
-    super(
-        log,
-        zkPaths.getAnnouncementsPath(),
-        zkPaths.getLiveSegmentsPath(),
+    this.inventoryManager = new CuratorInventoryManager<>(
         curator,
-        jsonMapper,
-        new TypeReference<Set<DataSegment>>()
+        new InventoryManagerConfig()
         {
+          @Override
+          public String getContainerPath()
+          {
+            return zkPaths.getAnnouncementsPath();
+          }
+
+          @Override
+          public String getInventoryPath()
+          {
+            return zkPaths.getLiveSegmentsPath();
+          }
+        },
+        Execs.singleThreaded(pathChildrenCacheExecPrefix + "-%s"),
+        new CuratorInventoryManagerStrategy<DruidServer, Set<DataSegment>>()
+        {
+          @Override
+          public DruidServer deserializeContainer(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, DruidServer.class);
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public Set<DataSegment> deserializeInventory(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, new TypeReference<Set<DataSegment>>()
+              {
+              });
+            }
+            catch (IOException e) {
+              log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public void newContainer(DruidServer container)
+          {
+            log.info("New Server[%s]", container);
+          }
+
+          @Override
+          public void deadContainer(DruidServer deadContainer)
+          {
+            log.info("Server Disappeared[%s]", deadContainer);
+            runServerRemovedCallbacks(deadContainer);
+          }
+
+          @Override
+          public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
+          {
+            return newContainer.addDataSegments(oldContainer);
+          }
+
+          @Override
+          public DruidServer addInventory(
+              final DruidServer container,
+              String inventoryKey,
+              final Set<DataSegment> inventory
+          )
+          {
+            return addInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer updateInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
+          {
+            return updateInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer removeInventory(final DruidServer container, String inventoryKey)
+          {
+            return removeInnerInventory(container, inventoryKey);
+          }
+
+          @Override
+          public void inventoryInitialized()
+          {
+            log.info("Inventory Initialized");
+            runSegmentCallbacks(SegmentCallback::segmentViewInitialized);
+          }
         }
     );
 
     this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
   }
 
+  @LifecycleStart
+  public void start() throws Exception
+  {
+    synchronized (started) {
+      if (!started.get()) {
+        inventoryManager.start();
+        started.set(true);
+      }
+    }
+  }
+
+  @LifecycleStop
+  public void stop() throws IOException
+  {
+    synchronized (started) {
+      if (started.getAndSet(false)) {
+        inventoryManager.stop();
+      }
+    }
+  }
+
   @Override
+  public boolean isStarted()
+  {
+    return started.get();
+  }
+
+  @Override
+  public DruidServer getInventoryValue(String containerKey)

Review comment:
       i think you are right, but am not so sure why it's named that way. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org