You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/01 00:12:58 UTC

[GitHub] leventov closed pull request #6606: Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized()

leventov closed pull request #6606: Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized()
URL: https://github.com/apache/incubator-druid/pull/6606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 6bb556d69ac..dfedeb42e9e 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -76,6 +76,7 @@
     <inspection_tool class="InvalidComparatorMethodReference" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="IteratorHasNextCallsIteratorNext" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="IteratorNextDoesNotThrowNoSuchElementException" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="Java8MapForEach" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="JsonDuplicatePropertyKeys" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="JsonStandardCompliance" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="LengthOneStringInIndexOf" enabled="true" level="ERROR" enabled_by_default="true" />
@@ -229,6 +230,7 @@
         <constraint name="E" within="" contains="" />
       </searchConfiguration>
     </inspection_tool>
+    <inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
       <option name="processCode" value="true" />
       <option name="processLiterals" value="true" />
diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
index 8b486cb7503..d2da5a705e0 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
@@ -140,7 +140,7 @@
   private final AtomicInteger allocatedBuffers = new AtomicInteger();
   private final AtomicInteger droppedBuffers = new AtomicInteger();
 
-  private volatile long lastFillTimeMillis;
+  private volatile long lastBatchFillTimeMillis;
   private final ConcurrentTimeCounter batchFillingTimeCounter = new ConcurrentTimeCounter();
 
   private final Object startLock = new Object();
@@ -180,8 +180,8 @@ public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client, ObjectM
     emittingThread = new EmittingThread(config);
     long firstBatchNumber = 1;
     concurrentBatch.set(new Batch(this, acquireBuffer(), firstBatchNumber));
-    // lastFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
-    lastFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
+    // lastBatchFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
+    lastBatchFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
   }
 
   @Override
@@ -328,7 +328,7 @@ private void doOnSealExclusive(Batch batch, long elapsedTimeMillis)
     if (elapsedTimeMillis > 0) {
       // If elapsedTimeMillis is 0 or negative, it's likely because System.currentTimeMillis() is not monotonic, so not
       // accounting this time for determining batch sending timeout.
-      lastFillTimeMillis = elapsedTimeMillis;
+      lastBatchFillTimeMillis = elapsedTimeMillis;
     }
     addBatchToEmitQueue(batch);
     wakeUpEmittingThread();
@@ -663,7 +663,7 @@ private void tryEmitAndDrainAllFailedBuffers()
      */
     private boolean sendWithRetries(final byte[] buffer, final int length, final int eventCount, boolean withTimeout)
     {
-      long deadLineMillis = System.currentTimeMillis() + sendRequestTimeoutMillis(lastFillTimeMillis);
+      long deadLineMillis = System.currentTimeMillis() + computeTimeoutForSendRequestInMillis(lastBatchFillTimeMillis);
       try {
         RetryUtils.retry(
             new RetryUtils.Task<Object>()
@@ -709,8 +709,8 @@ public boolean apply(Throwable e)
 
     private void send(byte[] buffer, int length) throws Exception
     {
-      long lastFillTimeMillis = HttpPostEmitter.this.lastFillTimeMillis;
-      final long timeoutMillis = sendRequestTimeoutMillis(lastFillTimeMillis);
+      long lastFillTimeMillis = HttpPostEmitter.this.lastBatchFillTimeMillis;
+      final long timeoutMillis = computeTimeoutForSendRequestInMillis(lastFillTimeMillis);
       if (timeoutMillis < config.getMinHttpTimeoutMillis()) {
         throw timeoutLessThanMinimumException;
       }
@@ -795,18 +795,27 @@ private void send(byte[] buffer, int length) throws Exception
       accountSuccessfulSending(sendingStartMs);
     }
 
-    private long sendRequestTimeoutMillis(long lastFillTimeMillis)
+    /**
+     * This method computes the timeout for sending a batch of events over HTTP, based on how much time it took to
+     * populate that batch. The idea is that if it took X milliseconds to fill the batch, we couldn't wait for more than
+     * X * {@link HttpEmitterConfig#httpTimeoutAllowanceFactor} milliseconds to send that data, because at the same time
+     * the next batch is probably being filled with the same speed, so we have to keep up with the speed.
+     *
+     * Ideally it should use something like moving average instead of plain last batch fill time in order to accomodate
+     * for emitting bursts, but it might unnecessary because Druid application might not produce events in bursts.
+     */
+    private long computeTimeoutForSendRequestInMillis(long lastBatchFillTimeMillis)
     {
       int emitQueueSize = approximateBuffersToEmitCount.get();
       if (emitQueueSize < EMIT_QUEUE_THRESHOLD_1) {
-        return (long) (lastFillTimeMillis * config.httpTimeoutAllowanceFactor);
+        return (long) (lastBatchFillTimeMillis * config.httpTimeoutAllowanceFactor);
       }
       if (emitQueueSize < EMIT_QUEUE_THRESHOLD_2) {
         // The idea is to not let buffersToEmit queue to grow faster than we can emit buffers.
-        return (long) (lastFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
+        return (long) (lastBatchFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
       }
       // If buffersToEmit still grows, try to restrict even more
-      return (long) (lastFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
+      return (long) (lastBatchFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
     }
 
     private void accountSuccessfulSending(long sendingStartMs)
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
index 09f5dede94d..7683258f302 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
@@ -91,7 +91,8 @@ public FileIteratingFirehoseTest(List<String> texts, int numSkipHeaderRows)
     this.expectedResults = inputs.stream()
         .map(input -> input.split("\n"))
         .flatMap(lines -> {
-          final List<String> filteredLines = Arrays.asList(lines).stream()
+          final List<String> filteredLines = Arrays
+              .stream(lines)
               .filter(line -> line.length() > 0)
               .map(line -> line.split(",")[1])
               .collect(Collectors.toList());
diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
index 812e6659cbd..07acd4a88ba 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
@@ -24,6 +24,7 @@
 import org.asynchttpclient.ListenableFuture;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,21 +66,26 @@ public void setup()
   @Test
   public void timeoutEmptyQueue() throws IOException, InterruptedException
   {
+    float timeoutAllowanceFactor = 2.0f;
     final HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
         .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
-        .setHttpTimeoutAllowanceFactor(2.0f)
+        .setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
         .build();
     final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
 
+    long startMs = System.currentTimeMillis();
     emitter.start();
     emitter.emitAndReturnBatch(new IntEvent());
     emitter.flush();
-    Assert.assertTrue(timeoutUsed.get() < 5);
+    long fillTimeMs = System.currentTimeMillis() - startMs;
+    Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
 
+    startMs = System.currentTimeMillis();
     final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
     Thread.sleep(1000);
     batch.seal();
     emitter.flush();
-    Assert.assertTrue(timeoutUsed.get() >= 2000 && timeoutUsed.get() < 3000);
+    fillTimeMs = System.currentTimeMillis() - startMs;
+    Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
   }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 0a4dac7fab1..75d26884249 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -86,6 +86,7 @@
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -454,9 +455,9 @@ public void run()
                 if (isPersistRequired) {
                   driver.persist(committerSupplier.get());
                 }
-                segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut(
-                    sequenceSegments.getKey(),
-                    sequenceSegments.getValue().stream().collect(Collectors.toList())
+                segmentsToMoveOut.forEach((String sequence, Set<SegmentIdentifier> segments) -> driver.moveSegmentOut(
+                    sequence,
+                    new ArrayList<SegmentIdentifier>(segments)
                 ));
               }
               catch (ParseException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index bf96b106ed8..5cb99ba569a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -443,18 +443,22 @@ private void startWorkersHandling() throws InterruptedException
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
-            nodes.stream().forEach(node -> addWorker(toWorker(node)));
+            nodes.forEach(node -> addWorker(toWorker(node)));
+          }
 
-            //CountDownLatch.countDown() does nothing when count has already reached 0.
-            workerViewInitialized.countDown();
+          @Override
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+          {
+            nodes.forEach(node -> removeWorker(toWorker(node)));
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodeViewInitialized()
           {
-            nodes.stream().forEach(node -> removeWorker(toWorker(node)));
+            //CountDownLatch.countDown() does nothing when count has already reached 0.
+            workerViewInitialized.countDown();
           }
         }
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index a1308836528..25e172ae141 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1380,6 +1380,7 @@ public void shutdownTask(String taskId)
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
index 9537ff89070..9903633609b 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
@@ -102,8 +102,7 @@ public Filter toFilter()
   public HashSet<String> getRequiredColumns()
   {
     HashSet<String> requiredColumns = new HashSet<>();
-    fields.stream()
-        .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+    fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
     return requiredColumns;
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
index ce8943139a4..d2ed4e77bbd 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
@@ -110,8 +110,7 @@ public Filter toFilter()
   public HashSet<String> getRequiredColumns()
   {
     HashSet<String> requiredColumns = new HashSet<>();
-    fields.stream()
-        .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+    fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
     return requiredColumns;
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 1c7fb6c1408..8ab4601b09a 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -737,7 +737,7 @@ private static void verifyOutputNames(
   private static Map<String, AggregatorFactory> getAggregatorsMap(List<AggregatorFactory> aggregatorSpecs)
   {
     Map<String, AggregatorFactory> map = new HashMap<>(aggregatorSpecs.size());
-    aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v));
+    aggregatorSpecs.forEach(v -> map.put(v.getName(), v));
     return map;
   }
 
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
index 61dc753e778..f2f1cf01d41 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
@@ -119,7 +119,7 @@ public void testStringColumnNullHandling() throws Exception
       try (QueryableIndex index = indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null))) {
         final ColumnHolder columnHolder = index.getColumnHolder("d");
 
-        if (subsetList.stream().allMatch(nullFlavors::contains)) {
+        if (nullFlavors.containsAll(subsetList)) {
           // all null -> should be missing
           Assert.assertNull(subsetList.toString(), columnHolder);
         } else {
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 855f5824822..bf0d1237d42 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -152,23 +152,23 @@ public void start() throws Exception
               private final AtomicBoolean initialized = new AtomicBoolean(false);
 
               @Override
-              public void nodesAdded(List<DiscoveryDruidNode> nodes)
+              public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
               {
-                nodes.forEach(
-                    node -> serverAdded(toDruidServer(node))
-                );
+                nodes.forEach(node -> serverAdded(toDruidServer(node)));
+              }
 
-                if (!initialized.getAndSet(true)) {
-                  executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
-                }
+              @Override
+              public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+              {
+                nodes.forEach(node -> serverRemoved(toDruidServer(node)));
               }
 
               @Override
-              public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+              public void nodeViewInitialized()
               {
-                nodes.forEach(
-                    node -> serverRemoved(toDruidServer(node))
-                );
+                if (!initialized.getAndSet(true)) {
+                  executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
+                }
               }
 
               private DruidServer toDruidServer(DiscoveryDruidNode node)
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 46b8ce912df..7025319d2de 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -43,6 +43,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -156,6 +157,7 @@ public void stop()
 
     // hostAndPort -> DiscoveryDruidNode
     private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+    private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
     private final PathChildrenCache cache;
     private final ExecutorService cacheExecutor;
@@ -166,7 +168,7 @@ public void stop()
 
     private final Object lock = new Object();
 
-    private CountDownLatch cacheInitialized = new CountDownLatch(1);
+    private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
     NodeTypeWatcher(
         ExecutorService listenerExecutor,
@@ -195,30 +197,39 @@ public void stop()
     @Override
     public Collection<DiscoveryDruidNode> getAllNodes()
     {
-      if (!isCacheInitialized(30, TimeUnit.SECONDS)) {
+      boolean nodeViewInitialized;
+      try {
+        nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        nodeViewInitialized = false;
+      }
+      if (!nodeViewInitialized) {
         log.info("cache is not initialized yet. getAllNodes() might not return full information.");
       }
-      return Collections.unmodifiableCollection(nodes.values());
+      return unmodifiableNodes;
     }
 
     @Override
     public void registerListener(DruidNodeDiscovery.Listener listener)
     {
       synchronized (lock) {
-        if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
-          ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
+        // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+        if (cacheInitialized.getCount() == 0) {
           safeSchedule(
               () -> {
-                listener.nodesAdded(currNodes);
+                listener.nodesAdded(unmodifiableNodes);
+                listener.nodeViewInitialized();
               },
-              "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
+              "Exception occured in nodesAdded([%s]) in listener [%s].", unmodifiableNodes, listener
           );
         }
         nodeListeners.add(listener);
       }
     }
 
-    public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event)
+    void handleChildEvent(PathChildrenCacheEvent event)
     {
       synchronized (lock) {
         try {
@@ -278,20 +289,24 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve
               break;
             }
             case INITIALIZED: {
-              if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+              // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
+              // counted down.
+              if (cacheInitialized.getCount() == 0) {
                 log.warn("cache is already initialized. ignoring [%s] event.", event.getType());
                 return;
               }
 
               log.info("Received INITIALIZED in node watcher.");
 
-              ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
-              for (Listener l : nodeListeners) {
+              for (Listener listener : nodeListeners) {
                 safeSchedule(
                     () -> {
-                      l.nodesAdded(currNodes);
+                      listener.nodesAdded(unmodifiableNodes);
+                      listener.nodeViewInitialized();
                     },
-                    "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l
+                    "Exception occured in nodesAdded([%s]) in listener [%s].",
+                    unmodifiableNodes,
+                    listener
                 );
               }
 
@@ -309,17 +324,6 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve
       }
     }
 
-    private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit)
-    {
-      try {
-        return cacheInitialized.await(waitFor, timeUnit);
-      }
-      catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-    }
-
     private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
     {
       listenerExecutor.submit(() -> {
@@ -332,20 +336,20 @@ private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args
       });
     }
 
+    @GuardedBy("lock")
     private void addNode(DiscoveryDruidNode druidNode)
     {
       DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
       if (prev == null) {
-        if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+        // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+        if (cacheInitialized.getCount() == 0) {
           List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
-          for (Listener l : nodeListeners) {
+          for (Listener listener : nodeListeners) {
             safeSchedule(
-                () -> {
-                  l.nodesAdded(newNode);
-                },
+                () -> listener.nodesAdded(newNode),
                 "Exception occured in nodeAdded(node=[%s]) in listener [%s].",
                 druidNode.getDruidNode().getHostAndPortToUse(),
-                l
+                listener
             );
           }
         }
@@ -359,6 +363,7 @@ private void addNode(DiscoveryDruidNode druidNode)
       }
     }
 
+    @GuardedBy("lock")
     private void removeNode(DiscoveryDruidNode druidNode)
     {
       DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
@@ -372,14 +377,15 @@ private void removeNode(DiscoveryDruidNode druidNode)
         return;
       }
 
-      if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+      // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+      if (cacheInitialized.getCount() == 0) {
         List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
-        for (Listener l : nodeListeners) {
+        for (Listener listener : nodeListeners) {
           safeSchedule(
-              () -> {
-                l.nodesRemoved(nodeRemoved);
-              },
-              "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode.getDruidNode().getHostAndPortToUse(), l
+              () -> listener.nodesRemoved(nodeRemoved),
+              "Exception occured in nodeRemoved(node=[%s]) in listener [%s].",
+              druidNode.getDruidNode().getHostAndPortToUse(),
+              listener
           );
         }
       }
@@ -388,9 +394,7 @@ private void removeNode(DiscoveryDruidNode druidNode)
     public void start()
     {
       try {
-        cache.getListenable().addListener(
-            (client, event) -> handleChildEvent(client, event)
-        );
+        cache.getListenable().addListener((client, event) -> handleChildEvent(event));
         cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
       }
       catch (Exception ex) {
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index 807440bda21..d9148c34f8b 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -20,7 +20,6 @@
 package org.apache.druid.discovery;
 
 import java.util.Collection;
-import java.util.List;
 
 /**
  * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
@@ -39,14 +38,16 @@
    */
   interface Listener
   {
+    void nodesAdded(Collection<DiscoveryDruidNode> nodes);
+
+    void nodesRemoved(Collection<DiscoveryDruidNode> nodes);
+
     /**
-     * List of nodes added.
-     * First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation
-     * has been initialized.
-     * @param nodes
+     * Called once when the underlying cache in the DruidNodeDiscovery implementation has been initialized.
      */
-    void nodesAdded(List<DiscoveryDruidNode> nodes);
-
-    void nodesRemoved(List<DiscoveryDruidNode> nodes);
+    default void nodeViewInitialized()
+    {
+      // do nothing
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 9ae34a3c19a..79e73c158fc 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.discovery;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.IAE;
@@ -28,7 +28,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,10 +65,11 @@ public DruidNodeDiscovery getForService(String serviceName)
           if (nodeTypesToWatch == null) {
             throw new IAE("Unknown service [%s].", service);
           }
-
-          ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service);
+          ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeTypesToWatch.size());
+          DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
+              serviceDiscovery.filteringUpstreamListener();
           for (NodeType nodeType : nodeTypesToWatch) {
-            getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
+            getForNodeType(nodeType).registerListener(filteringGatheringUpstreamListener);
           }
           return serviceDiscovery;
         }
@@ -82,55 +82,66 @@ public DruidNodeDiscovery getForService(String serviceName)
 
     private final String service;
     private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+    private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
     private final List<Listener> listeners = new ArrayList<>();
 
     private final Object lock = new Object();
 
-    private Set<NodeTypeListener> uninitializedNodeTypeListeners = new HashSet<>();
+    private int uninitializedNodeTypes;
 
-    ServiceDruidNodeDiscovery(String service)
+    ServiceDruidNodeDiscovery(String service, int watchedNodeTypes)
     {
+      Preconditions.checkArgument(watchedNodeTypes > 0);
       this.service = service;
+      this.uninitializedNodeTypes = watchedNodeTypes;
     }
 
     @Override
     public Collection<DiscoveryDruidNode> getAllNodes()
     {
-      return Collections.unmodifiableCollection(nodes.values());
+      return unmodifiableNodes;
     }
 
     @Override
     public void registerListener(Listener listener)
     {
+      if (listener instanceof FilteringUpstreamListener) {
+        throw new IAE("FilteringUpstreamListener should not be registered with ServiceDruidNodeDiscovery itself");
+      }
       synchronized (lock) {
-        if (uninitializedNodeTypeListeners.isEmpty()) {
-          listener.nodesAdded(ImmutableList.copyOf(nodes.values()));
+        if (!unmodifiableNodes.isEmpty()) {
+          listener.nodesAdded(unmodifiableNodes);
+        }
+        if (uninitializedNodeTypes == 0) {
+          listener.nodeViewInitialized();
         }
         listeners.add(listener);
       }
     }
 
-    NodeTypeListener nodeTypeListener()
+    DruidNodeDiscovery.Listener filteringUpstreamListener()
     {
-      NodeTypeListener nodeListener = new NodeTypeListener();
-      uninitializedNodeTypeListeners.add(nodeListener);
-      return nodeListener;
+      return new FilteringUpstreamListener();
     }
 
-    class NodeTypeListener implements DruidNodeDiscovery.Listener
+    /**
+     * Listens for all node updates and filters them based on {@link #service}. Note: this listener is registered with
+     * the objects returned from {@link #getForNodeType(NodeType)}, NOT with {@link ServiceDruidNodeDiscovery} itself.
+     */
+    class FilteringUpstreamListener implements DruidNodeDiscovery.Listener
     {
       @Override
-      public void nodesAdded(List<DiscoveryDruidNode> nodesDiscovered)
+      public void nodesAdded(Collection<DiscoveryDruidNode> nodesDiscovered)
       {
         synchronized (lock) {
-          ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+          List<DiscoveryDruidNode> nodesAdded = new ArrayList<>();
           for (DiscoveryDruidNode node : nodesDiscovered) {
             if (node.getServices().containsKey(service)) {
               DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
 
               if (prev == null) {
-                builder.add(node);
+                nodesAdded.add(node);
               } else {
                 log.warn("Node[%s] discovered but already exists [%s].", node, prev);
               }
@@ -139,48 +150,70 @@ public void nodesAdded(List<DiscoveryDruidNode> nodesDiscovered)
             }
           }
 
-          ImmutableList<DiscoveryDruidNode> newNodesAdded = null;
-          if (uninitializedNodeTypeListeners.isEmpty()) {
-            newNodesAdded = builder.build();
-          } else if (uninitializedNodeTypeListeners.remove(this) && uninitializedNodeTypeListeners.isEmpty()) {
-            newNodesAdded = ImmutableList.copyOf(nodes.values());
+          if (nodesAdded.isEmpty()) {
+            // Don't bother listeners with an empty update, it doesn't make sense.
+            return;
           }
 
-          if (newNodesAdded != null) {
-            for (Listener listener : listeners) {
-              try {
-                listener.nodesAdded(newNodesAdded);
-              }
-              catch (Exception ex) {
-                log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, newNodesAdded);
-              }
+          Collection<DiscoveryDruidNode> unmodifiableNodesAdded = Collections.unmodifiableCollection(nodesAdded);
+          for (Listener listener : listeners) {
+            try {
+              listener.nodesAdded(unmodifiableNodesAdded);
+            }
+            catch (Exception ex) {
+              log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, nodesAdded);
             }
           }
         }
       }
 
       @Override
-      public void nodesRemoved(List<DiscoveryDruidNode> nodesDisappeared)
+      public void nodesRemoved(Collection<DiscoveryDruidNode> nodesDisappeared)
       {
         synchronized (lock) {
-          ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+          List<DiscoveryDruidNode> nodesRemoved = new ArrayList<>();
           for (DiscoveryDruidNode node : nodesDisappeared) {
             DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
             if (prev != null) {
-              builder.add(node);
+              nodesRemoved.add(node);
             } else {
               log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
             }
           }
 
-          if (uninitializedNodeTypeListeners.isEmpty()) {
-            ImmutableList<DiscoveryDruidNode> nodesRemoved = builder.build();
+          if (nodesRemoved.isEmpty()) {
+            // Don't bother listeners with an empty update, it doesn't make sense.
+            return;
+          }
+
+          Collection<DiscoveryDruidNode> unmodifiableNodesRemoved = Collections.unmodifiableCollection(nodesRemoved);
+          for (Listener listener : listeners) {
+            try {
+              listener.nodesRemoved(unmodifiableNodesRemoved);
+            }
+            catch (Exception ex) {
+              log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void nodeViewInitialized()
+      {
+        synchronized (lock) {
+          if (uninitializedNodeTypes == 0) {
+            log.error("Unexpected call of nodeViewInitialized()");
+            return;
+          }
+          uninitializedNodeTypes--;
+          if (uninitializedNodeTypes == 0) {
             for (Listener listener : listeners) {
               try {
-                listener.nodesRemoved(nodesRemoved);
+                listener.nodeViewInitialized();
               }
               catch (Exception ex) {
-                log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+                log.error(ex, "Listener[%s].nodeViewInitialized() threw exception. Ignored.", listener);
               }
             }
           }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
index f06ffd010b0..4c6efc08ad4 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
@@ -61,33 +61,25 @@ public AppenderatorDriverMetadata(
       final Map<String, List<SegmentWithState>> newMetadata = new HashMap<>();
       final Set<String> activeSegmentsAlreadySeen = new HashSet<>(); // temp data structure
 
-      activeSegments.entrySet()
-                    .forEach(sequenceSegments -> newMetadata.put(
-                        sequenceSegments.getKey(),
-                        sequenceSegments.getValue()
-                                        .stream()
-                                        .map(segmentIdentifier -> {
-                                          activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
-                                          return SegmentWithState.newSegment(segmentIdentifier);
-                                        })
-                                        .collect(Collectors.toList())
-                    ));
+      activeSegments.forEach((String sequence, List<SegmentIdentifier> sequenceSegments) -> newMetadata.put(
+          sequence,
+          sequenceSegments
+              .stream()
+              .map(segmentIdentifier -> {
+                activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
+                return SegmentWithState.newSegment(segmentIdentifier);
+              })
+              .collect(Collectors.toList())
+      ));
       // publishPendingSegments is a superset of activeSegments
-      publishPendingSegments.entrySet()
-                            .forEach(sequenceSegments -> newMetadata.computeIfAbsent(
-                                sequenceSegments.getKey(),
-                                k -> new ArrayList<>()
-                            ).addAll(
-                                sequenceSegments.getValue()
-                                                .stream()
-                                                .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(
-                                                    segmentIdentifier.toString()))
-                                                .map(segmentIdentifier -> SegmentWithState.newSegment(
-                                                    segmentIdentifier,
-                                                    SegmentState.APPEND_FINISHED
-                                                ))
-                                                .collect(Collectors.toList())
-                            ));
+      publishPendingSegments.forEach((sequence, sequenceSegments) -> {
+        List<SegmentWithState> segmentWithStates = newMetadata.computeIfAbsent(sequence, seq -> new ArrayList<>());
+        sequenceSegments
+            .stream()
+            .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(segmentIdentifier.toString()))
+            .map(segmentIdentifier -> SegmentWithState.newSegment(segmentIdentifier, SegmentState.APPEND_FINISHED))
+            .forEach(segmentWithStates::add);
+      });
       this.segments = newMetadata;
     } else {
       this.segments = segments;
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
index 1ad59fb501f..40100fbae38 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
@@ -78,7 +78,7 @@ public HostAndPortWithScheme apply(@Nullable DiscoveryDruidNode input)
   {
     ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
 
-    druidNodeDiscovery.getAllNodes().stream().forEach(
+    druidNodeDiscovery.getAllNodes().forEach(
         node -> builder.add(((LookupNodeService) node.getServices()
                                                      .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
     );
diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index fcc3303e7c1..9263b351b48 100644
--- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -41,6 +41,7 @@
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -131,7 +132,7 @@ public void start()
           new DruidNodeDiscovery.Listener()
           {
             @Override
-            public void nodesAdded(List<DiscoveryDruidNode> nodes)
+            public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
             {
               nodes.forEach(
                   (node) -> {
@@ -144,7 +145,7 @@ public void nodesAdded(List<DiscoveryDruidNode> nodes)
             }
 
             @Override
-            public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+            public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
             {
               nodes.forEach(
                   (node) -> {
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 01097e61754..be49011860f 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -281,6 +281,7 @@ public void testSimple() throws Exception
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index e4f577ed02f..fcf6b6b3627 100644
--- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -39,7 +39,6 @@
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -129,13 +128,13 @@ public void testAnnouncementAndDiscovery() throws Exception
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             coordNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             coordNodes.removeAll(nodes);
           }
@@ -147,13 +146,13 @@ public void nodesRemoved(List<DiscoveryDruidNode> nodes)
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             overlordNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             overlordNodes.removeAll(nodes);
           }
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
index d361df528e8..2b5722d6742 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
@@ -28,6 +28,7 @@
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,13 +48,13 @@ public void testGetForService()
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             dataNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             dataNodes.removeAll(nodes);
           }
@@ -66,13 +67,13 @@ public void nodesRemoved(List<DiscoveryDruidNode> nodes)
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             lookupNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             lookupNodes.removeAll(nodes);
           }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 2b4a8c5c75e..0810a02de3b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -71,7 +71,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 /**
  * This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer
@@ -383,7 +382,7 @@ public void testMoveSegment() throws Exception
     // clean up drop from load queue
     curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getIdentifier()));
 
-    List<DruidServer> servers = serverView.getInventory().stream().collect(Collectors.toList());
+    List<DruidServer> servers = new ArrayList<>(serverView.getInventory());
 
     Assert.assertEquals(2, servers.get(0).getSegments().size());
     Assert.assertEquals(2, servers.get(1).getSegments().size());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index a85491480f9..56ca7ec6cd4 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -185,6 +185,7 @@ public void testLoadDropAfterStop() throws Exception
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
index 2ef2b850542..81f2c103032 100644
--- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
+++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -103,6 +103,7 @@ public void setUp()
       public void registerListener(Listener listener)
       {
         listener.nodesAdded(ImmutableList.of(node1, node2, node3));
+        listener.nodeViewInitialized();
       }
     };
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
index 3b4985c6d08..7f0ab00f08c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
@@ -34,8 +34,8 @@
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
-import java.util.Arrays;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class CeilOperatorConversion implements SqlOperatorConversion
 {
@@ -80,14 +80,17 @@ public DruidExpression toDruidExpression(
       // So there is no simple extraction for this operator.
       return DruidExpression.fromFunctionCall(
           "timestamp_ceil",
-          Arrays.asList(
-              druidExpression.getExpression(),
-              DruidExpression.stringLiteral(granularity.getPeriod().toString()),
-              DruidExpression.numberLiteral(
-                  granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
-              ),
-              DruidExpression.stringLiteral(granularity.getTimeZone().toString())
-          ).stream().map(DruidExpression::fromExpression).collect(Collectors.toList())
+          Stream
+              .of(
+                  druidExpression.getExpression(),
+                  DruidExpression.stringLiteral(granularity.getPeriod().toString()),
+                  DruidExpression.numberLiteral(
+                      granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
+                  ),
+                  DruidExpression.stringLiteral(granularity.getTimeZone().toString())
+              )
+              .map(DruidExpression::fromExpression)
+              .collect(Collectors.toList())
       );
     } else {
       // WTF? CEIL with 3 arguments?
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
index 5be00360395..a3c35c00334 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
@@ -92,8 +92,8 @@ public static Aggregation translateAggregateCall(
     final List<Aggregation> existingAggregationsWithSameFilter = new ArrayList<>();
     for (Aggregation existingAggregation : existingAggregations) {
       if (filter == null) {
-        final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().allMatch(
-            factory -> !(factory instanceof FilteredAggregatorFactory)
+        final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().noneMatch(
+            factory -> factory instanceof FilteredAggregatorFactory
         );
 
         if (doesMatch) {
@@ -160,6 +160,6 @@ private static boolean isUsingExistingAggregation(
         .map(AggregatorFactory::getName)
         .collect(Collectors.toSet());
 
-    return aggregation.getPostAggregator().getDependentFields().stream().allMatch(existingAggregationNames::contains);
+    return existingAggregationNames.containsAll(aggregation.getPostAggregator().getDependentFields());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org