You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2018/12/01 00:13:03 UTC

[incubator-druid] branch master updated: Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ec38df7  Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)
ec38df7 is described below

commit ec38df75756b975461bf1234e26cd195c462107d
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Sat Dec 1 01:12:56 2018 +0100

    Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)
    
    * Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() method; prohibit and eliminate some suboptimal Java 8 patterns
    
    * Fix style
    
    * Fix HttpEmitterTest.timeoutEmptyQueue()
    
    * Add DruidNodeDiscovery.Listener.nodeViewInitialized() calls in tests
    
    * Clarify code
---
 .idea/inspectionProfiles/Druid.xml                 |   2 +
 .../java/util/emitter/core/HttpPostEmitter.java    |  31 +++---
 .../data/input/impl/FileIteratingFirehoseTest.java |   3 +-
 .../java/util/emitter/core/HttpEmitterTest.java    |  12 ++-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |   7 +-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  16 +--
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |   1 +
 .../apache/druid/query/filter/AndDimFilter.java    |   3 +-
 .../org/apache/druid/query/filter/OrDimFilter.java |   3 +-
 .../apache/druid/query/groupby/GroupByQuery.java   |   2 +-
 .../druid/segment/IndexMergerNullHandlingTest.java |   2 +-
 .../druid/client/HttpServerInventoryView.java      |  22 ++---
 .../CuratorDruidNodeDiscoveryProvider.java         |  82 ++++++++--------
 .../apache/druid/discovery/DruidNodeDiscovery.java |  17 ++--
 .../discovery/DruidNodeDiscoveryProvider.java      | 109 ++++++++++++++-------
 .../appenderator/AppenderatorDriverMetadata.java   |  44 ++++-----
 .../server/lookup/cache/LookupNodeDiscovery.java   |   2 +-
 .../server/router/TieredBrokerHostSelector.java    |   5 +-
 .../druid/client/HttpServerInventoryViewTest.java  |   1 +
 .../CuratorDruidNodeAnnouncerAndDiscoveryTest.java |   9 +-
 .../discovery/DruidNodeDiscoveryProviderTest.java  |   9 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |   3 +-
 .../server/coordinator/HttpLoadQueuePeonTest.java  |   1 +
 .../router/TieredBrokerHostSelectorTest.java       |   1 +
 .../expression/builtin/CeilOperatorConversion.java |  21 ++--
 .../druid/sql/calcite/rule/GroupByRules.java       |   6 +-
 26 files changed, 236 insertions(+), 178 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 6bb556d..dfedeb4 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -76,6 +76,7 @@
     <inspection_tool class="InvalidComparatorMethodReference" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="IteratorHasNextCallsIteratorNext" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="IteratorNextDoesNotThrowNoSuchElementException" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="Java8MapForEach" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="JsonDuplicatePropertyKeys" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="JsonStandardCompliance" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="LengthOneStringInIndexOf" enabled="true" level="ERROR" enabled_by_default="true" />
@@ -229,6 +230,7 @@
         <constraint name="E" within="" contains="" />
       </searchConfiguration>
     </inspection_tool>
+    <inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
       <option name="processCode" value="true" />
       <option name="processLiterals" value="true" />
diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
index 8b486cb..d2da5a7 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
@@ -140,7 +140,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
   private final AtomicInteger allocatedBuffers = new AtomicInteger();
   private final AtomicInteger droppedBuffers = new AtomicInteger();
 
-  private volatile long lastFillTimeMillis;
+  private volatile long lastBatchFillTimeMillis;
   private final ConcurrentTimeCounter batchFillingTimeCounter = new ConcurrentTimeCounter();
 
   private final Object startLock = new Object();
@@ -180,8 +180,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
     emittingThread = new EmittingThread(config);
     long firstBatchNumber = 1;
     concurrentBatch.set(new Batch(this, acquireBuffer(), firstBatchNumber));
-    // lastFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
-    lastFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
+    // lastBatchFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
+    lastBatchFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
   }
 
   @Override
@@ -328,7 +328,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
     if (elapsedTimeMillis > 0) {
       // If elapsedTimeMillis is 0 or negative, it's likely because System.currentTimeMillis() is not monotonic, so not
       // accounting this time for determining batch sending timeout.
-      lastFillTimeMillis = elapsedTimeMillis;
+      lastBatchFillTimeMillis = elapsedTimeMillis;
     }
     addBatchToEmitQueue(batch);
     wakeUpEmittingThread();
@@ -663,7 +663,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
      */
     private boolean sendWithRetries(final byte[] buffer, final int length, final int eventCount, boolean withTimeout)
     {
-      long deadLineMillis = System.currentTimeMillis() + sendRequestTimeoutMillis(lastFillTimeMillis);
+      long deadLineMillis = System.currentTimeMillis() + computeTimeoutForSendRequestInMillis(lastBatchFillTimeMillis);
       try {
         RetryUtils.retry(
             new RetryUtils.Task<Object>()
@@ -709,8 +709,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
 
     private void send(byte[] buffer, int length) throws Exception
     {
-      long lastFillTimeMillis = HttpPostEmitter.this.lastFillTimeMillis;
-      final long timeoutMillis = sendRequestTimeoutMillis(lastFillTimeMillis);
+      long lastFillTimeMillis = HttpPostEmitter.this.lastBatchFillTimeMillis;
+      final long timeoutMillis = computeTimeoutForSendRequestInMillis(lastFillTimeMillis);
       if (timeoutMillis < config.getMinHttpTimeoutMillis()) {
         throw timeoutLessThanMinimumException;
       }
@@ -795,18 +795,27 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
       accountSuccessfulSending(sendingStartMs);
     }
 
-    private long sendRequestTimeoutMillis(long lastFillTimeMillis)
+    /**
+     * This method computes the timeout for sending a batch of events over HTTP, based on how much time it took to
+     * populate that batch. The idea is that if it took X milliseconds to fill the batch, we couldn't wait for more than
+     * X * {@link HttpEmitterConfig#httpTimeoutAllowanceFactor} milliseconds to send that data, because at the same time
+     * the next batch is probably being filled with the same speed, so we have to keep up with the speed.
+     *
+     * Ideally it should use something like moving average instead of plain last batch fill time in order to accomodate
+     * for emitting bursts, but it might unnecessary because Druid application might not produce events in bursts.
+     */
+    private long computeTimeoutForSendRequestInMillis(long lastBatchFillTimeMillis)
     {
       int emitQueueSize = approximateBuffersToEmitCount.get();
       if (emitQueueSize < EMIT_QUEUE_THRESHOLD_1) {
-        return (long) (lastFillTimeMillis * config.httpTimeoutAllowanceFactor);
+        return (long) (lastBatchFillTimeMillis * config.httpTimeoutAllowanceFactor);
       }
       if (emitQueueSize < EMIT_QUEUE_THRESHOLD_2) {
         // The idea is to not let buffersToEmit queue to grow faster than we can emit buffers.
-        return (long) (lastFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
+        return (long) (lastBatchFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
       }
       // If buffersToEmit still grows, try to restrict even more
-      return (long) (lastFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
+      return (long) (lastBatchFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
     }
 
     private void accountSuccessfulSending(long sendingStartMs)
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
index 09f5ded..7683258 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
@@ -91,7 +91,8 @@ public class FileIteratingFirehoseTest
     this.expectedResults = inputs.stream()
         .map(input -> input.split("\n"))
         .flatMap(lines -> {
-          final List<String> filteredLines = Arrays.asList(lines).stream()
+          final List<String> filteredLines = Arrays
+              .stream(lines)
               .filter(line -> line.length() > 0)
               .map(line -> line.split(",")[1])
               .collect(Collectors.toList());
diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
index 812e665..07acd4a 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
@@ -24,6 +24,7 @@ import com.google.common.primitives.Ints;
 import org.asynchttpclient.ListenableFuture;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,21 +66,26 @@ public class HttpEmitterTest
   @Test
   public void timeoutEmptyQueue() throws IOException, InterruptedException
   {
+    float timeoutAllowanceFactor = 2.0f;
     final HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
         .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
-        .setHttpTimeoutAllowanceFactor(2.0f)
+        .setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
         .build();
     final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
 
+    long startMs = System.currentTimeMillis();
     emitter.start();
     emitter.emitAndReturnBatch(new IntEvent());
     emitter.flush();
-    Assert.assertTrue(timeoutUsed.get() < 5);
+    long fillTimeMs = System.currentTimeMillis() - startMs;
+    Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
 
+    startMs = System.currentTimeMillis();
     final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
     Thread.sleep(1000);
     batch.seal();
     emitter.flush();
-    Assert.assertTrue(timeoutUsed.get() >= 2000 && timeoutUsed.get() < 3000);
+    fillTimeMs = System.currentTimeMillis() - startMs;
+    Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
   }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 0a4dac7..75d2688 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -86,6 +86,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -454,9 +455,9 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
                 if (isPersistRequired) {
                   driver.persist(committerSupplier.get());
                 }
-                segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut(
-                    sequenceSegments.getKey(),
-                    sequenceSegments.getValue().stream().collect(Collectors.toList())
+                segmentsToMoveOut.forEach((String sequence, Set<SegmentIdentifier> segments) -> driver.moveSegmentOut(
+                    sequence,
+                    new ArrayList<SegmentIdentifier>(segments)
                 ));
               }
               catch (ParseException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index bf96b10..5cb99ba 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -443,18 +443,22 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
-            nodes.stream().forEach(node -> addWorker(toWorker(node)));
+            nodes.forEach(node -> addWorker(toWorker(node)));
+          }
 
-            //CountDownLatch.countDown() does nothing when count has already reached 0.
-            workerViewInitialized.countDown();
+          @Override
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+          {
+            nodes.forEach(node -> removeWorker(toWorker(node)));
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodeViewInitialized()
           {
-            nodes.stream().forEach(node -> removeWorker(toWorker(node)));
+            //CountDownLatch.countDown() does nothing when count has already reached 0.
+            workerViewInitialized.countDown();
           }
         }
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index a130883..25e172a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1380,6 +1380,7 @@ public class HttpRemoteTaskRunnerTest
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
index 9537ff8..9903633 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
@@ -102,8 +102,7 @@ public class AndDimFilter implements DimFilter
   public HashSet<String> getRequiredColumns()
   {
     HashSet<String> requiredColumns = new HashSet<>();
-    fields.stream()
-        .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+    fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
     return requiredColumns;
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
index ce89431..d2ed4e7 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
@@ -110,8 +110,7 @@ public class OrDimFilter implements DimFilter
   public HashSet<String> getRequiredColumns()
   {
     HashSet<String> requiredColumns = new HashSet<>();
-    fields.stream()
-        .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+    fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
     return requiredColumns;
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 1c7fb6c..8ab4601 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -737,7 +737,7 @@ public class GroupByQuery extends BaseQuery<Row>
   private static Map<String, AggregatorFactory> getAggregatorsMap(List<AggregatorFactory> aggregatorSpecs)
   {
     Map<String, AggregatorFactory> map = new HashMap<>(aggregatorSpecs.size());
-    aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v));
+    aggregatorSpecs.forEach(v -> map.put(v.getName(), v));
     return map;
   }
 
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
index 61dc753..f2f1cf0 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
@@ -119,7 +119,7 @@ public class IndexMergerNullHandlingTest
       try (QueryableIndex index = indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null))) {
         final ColumnHolder columnHolder = index.getColumnHolder("d");
 
-        if (subsetList.stream().allMatch(nullFlavors::contains)) {
+        if (nullFlavors.containsAll(subsetList)) {
           // all null -> should be missing
           Assert.assertNull(subsetList.toString(), columnHolder);
         } else {
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 855f582..bf0d123 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -152,23 +152,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
               private final AtomicBoolean initialized = new AtomicBoolean(false);
 
               @Override
-              public void nodesAdded(List<DiscoveryDruidNode> nodes)
+              public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
               {
-                nodes.forEach(
-                    node -> serverAdded(toDruidServer(node))
-                );
+                nodes.forEach(node -> serverAdded(toDruidServer(node)));
+              }
 
-                if (!initialized.getAndSet(true)) {
-                  executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
-                }
+              @Override
+              public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+              {
+                nodes.forEach(node -> serverRemoved(toDruidServer(node)));
               }
 
               @Override
-              public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+              public void nodeViewInitialized()
               {
-                nodes.forEach(
-                    node -> serverRemoved(toDruidServer(node))
-                );
+                if (!initialized.getAndSet(true)) {
+                  executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
+                }
               }
 
               private DruidServer toDruidServer(DiscoveryDruidNode node)
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 46b8ce9..7025319 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -156,6 +157,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
 
     // hostAndPort -> DiscoveryDruidNode
     private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+    private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
     private final PathChildrenCache cache;
     private final ExecutorService cacheExecutor;
@@ -166,7 +168,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
 
     private final Object lock = new Object();
 
-    private CountDownLatch cacheInitialized = new CountDownLatch(1);
+    private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
     NodeTypeWatcher(
         ExecutorService listenerExecutor,
@@ -195,30 +197,39 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
     @Override
     public Collection<DiscoveryDruidNode> getAllNodes()
     {
-      if (!isCacheInitialized(30, TimeUnit.SECONDS)) {
+      boolean nodeViewInitialized;
+      try {
+        nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        nodeViewInitialized = false;
+      }
+      if (!nodeViewInitialized) {
         log.info("cache is not initialized yet. getAllNodes() might not return full information.");
       }
-      return Collections.unmodifiableCollection(nodes.values());
+      return unmodifiableNodes;
     }
 
     @Override
     public void registerListener(DruidNodeDiscovery.Listener listener)
     {
       synchronized (lock) {
-        if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
-          ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
+        // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+        if (cacheInitialized.getCount() == 0) {
           safeSchedule(
               () -> {
-                listener.nodesAdded(currNodes);
+                listener.nodesAdded(unmodifiableNodes);
+                listener.nodeViewInitialized();
               },
-              "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
+              "Exception occured in nodesAdded([%s]) in listener [%s].", unmodifiableNodes, listener
           );
         }
         nodeListeners.add(listener);
       }
     }
 
-    public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event)
+    void handleChildEvent(PathChildrenCacheEvent event)
     {
       synchronized (lock) {
         try {
@@ -278,20 +289,24 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
               break;
             }
             case INITIALIZED: {
-              if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+              // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
+              // counted down.
+              if (cacheInitialized.getCount() == 0) {
                 log.warn("cache is already initialized. ignoring [%s] event.", event.getType());
                 return;
               }
 
               log.info("Received INITIALIZED in node watcher.");
 
-              ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
-              for (Listener l : nodeListeners) {
+              for (Listener listener : nodeListeners) {
                 safeSchedule(
                     () -> {
-                      l.nodesAdded(currNodes);
+                      listener.nodesAdded(unmodifiableNodes);
+                      listener.nodeViewInitialized();
                     },
-                    "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l
+                    "Exception occured in nodesAdded([%s]) in listener [%s].",
+                    unmodifiableNodes,
+                    listener
                 );
               }
 
@@ -309,17 +324,6 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
       }
     }
 
-    private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit)
-    {
-      try {
-        return cacheInitialized.await(waitFor, timeUnit);
-      }
-      catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-    }
-
     private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
     {
       listenerExecutor.submit(() -> {
@@ -332,20 +336,20 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
       });
     }
 
+    @GuardedBy("lock")
     private void addNode(DiscoveryDruidNode druidNode)
     {
       DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
       if (prev == null) {
-        if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+        // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+        if (cacheInitialized.getCount() == 0) {
           List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
-          for (Listener l : nodeListeners) {
+          for (Listener listener : nodeListeners) {
             safeSchedule(
-                () -> {
-                  l.nodesAdded(newNode);
-                },
+                () -> listener.nodesAdded(newNode),
                 "Exception occured in nodeAdded(node=[%s]) in listener [%s].",
                 druidNode.getDruidNode().getHostAndPortToUse(),
-                l
+                listener
             );
           }
         }
@@ -359,6 +363,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
       }
     }
 
+    @GuardedBy("lock")
     private void removeNode(DiscoveryDruidNode druidNode)
     {
       DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
@@ -372,14 +377,15 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
         return;
       }
 
-      if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+      // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+      if (cacheInitialized.getCount() == 0) {
         List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
-        for (Listener l : nodeListeners) {
+        for (Listener listener : nodeListeners) {
           safeSchedule(
-              () -> {
-                l.nodesRemoved(nodeRemoved);
-              },
-              "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode.getDruidNode().getHostAndPortToUse(), l
+              () -> listener.nodesRemoved(nodeRemoved),
+              "Exception occured in nodeRemoved(node=[%s]) in listener [%s].",
+              druidNode.getDruidNode().getHostAndPortToUse(),
+              listener
           );
         }
       }
@@ -388,9 +394,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
     public void start()
     {
       try {
-        cache.getListenable().addListener(
-            (client, event) -> handleChildEvent(client, event)
-        );
+        cache.getListenable().addListener((client, event) -> handleChildEvent(event));
         cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
       }
       catch (Exception ex) {
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index 807440b..d9148c3 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -20,7 +20,6 @@
 package org.apache.druid.discovery;
 
 import java.util.Collection;
-import java.util.List;
 
 /**
  * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
@@ -39,14 +38,16 @@ public interface DruidNodeDiscovery
    */
   interface Listener
   {
+    void nodesAdded(Collection<DiscoveryDruidNode> nodes);
+
+    void nodesRemoved(Collection<DiscoveryDruidNode> nodes);
+
     /**
-     * List of nodes added.
-     * First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation
-     * has been initialized.
-     * @param nodes
+     * Called once when the underlying cache in the DruidNodeDiscovery implementation has been initialized.
      */
-    void nodesAdded(List<DiscoveryDruidNode> nodes);
-
-    void nodesRemoved(List<DiscoveryDruidNode> nodes);
+    default void nodeViewInitialized()
+    {
+      // do nothing
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 9ae34a3..79e73c1 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.discovery;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.IAE;
@@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.logger.Logger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,10 +65,11 @@ public abstract class DruidNodeDiscoveryProvider
           if (nodeTypesToWatch == null) {
             throw new IAE("Unknown service [%s].", service);
           }
-
-          ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service);
+          ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeTypesToWatch.size());
+          DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
+              serviceDiscovery.filteringUpstreamListener();
           for (NodeType nodeType : nodeTypesToWatch) {
-            getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
+            getForNodeType(nodeType).registerListener(filteringGatheringUpstreamListener);
           }
           return serviceDiscovery;
         }
@@ -82,55 +82,66 @@ public abstract class DruidNodeDiscoveryProvider
 
     private final String service;
     private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+    private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
     private final List<Listener> listeners = new ArrayList<>();
 
     private final Object lock = new Object();
 
-    private Set<NodeTypeListener> uninitializedNodeTypeListeners = new HashSet<>();
+    private int uninitializedNodeTypes;
 
-    ServiceDruidNodeDiscovery(String service)
+    ServiceDruidNodeDiscovery(String service, int watchedNodeTypes)
     {
+      Preconditions.checkArgument(watchedNodeTypes > 0);
       this.service = service;
+      this.uninitializedNodeTypes = watchedNodeTypes;
     }
 
     @Override
     public Collection<DiscoveryDruidNode> getAllNodes()
     {
-      return Collections.unmodifiableCollection(nodes.values());
+      return unmodifiableNodes;
     }
 
     @Override
     public void registerListener(Listener listener)
     {
+      if (listener instanceof FilteringUpstreamListener) {
+        throw new IAE("FilteringUpstreamListener should not be registered with ServiceDruidNodeDiscovery itself");
+      }
       synchronized (lock) {
-        if (uninitializedNodeTypeListeners.isEmpty()) {
-          listener.nodesAdded(ImmutableList.copyOf(nodes.values()));
+        if (!unmodifiableNodes.isEmpty()) {
+          listener.nodesAdded(unmodifiableNodes);
+        }
+        if (uninitializedNodeTypes == 0) {
+          listener.nodeViewInitialized();
         }
         listeners.add(listener);
       }
     }
 
-    NodeTypeListener nodeTypeListener()
+    DruidNodeDiscovery.Listener filteringUpstreamListener()
     {
-      NodeTypeListener nodeListener = new NodeTypeListener();
-      uninitializedNodeTypeListeners.add(nodeListener);
-      return nodeListener;
+      return new FilteringUpstreamListener();
     }
 
-    class NodeTypeListener implements DruidNodeDiscovery.Listener
+    /**
+     * Listens for all node updates and filters them based on {@link #service}. Note: this listener is registered with
+     * the objects returned from {@link #getForNodeType(NodeType)}, NOT with {@link ServiceDruidNodeDiscovery} itself.
+     */
+    class FilteringUpstreamListener implements DruidNodeDiscovery.Listener
     {
       @Override
-      public void nodesAdded(List<DiscoveryDruidNode> nodesDiscovered)
+      public void nodesAdded(Collection<DiscoveryDruidNode> nodesDiscovered)
       {
         synchronized (lock) {
-          ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+          List<DiscoveryDruidNode> nodesAdded = new ArrayList<>();
           for (DiscoveryDruidNode node : nodesDiscovered) {
             if (node.getServices().containsKey(service)) {
               DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
 
               if (prev == null) {
-                builder.add(node);
+                nodesAdded.add(node);
               } else {
                 log.warn("Node[%s] discovered but already exists [%s].", node, prev);
               }
@@ -139,48 +150,70 @@ public abstract class DruidNodeDiscoveryProvider
             }
           }
 
-          ImmutableList<DiscoveryDruidNode> newNodesAdded = null;
-          if (uninitializedNodeTypeListeners.isEmpty()) {
-            newNodesAdded = builder.build();
-          } else if (uninitializedNodeTypeListeners.remove(this) && uninitializedNodeTypeListeners.isEmpty()) {
-            newNodesAdded = ImmutableList.copyOf(nodes.values());
+          if (nodesAdded.isEmpty()) {
+            // Don't bother listeners with an empty update, it doesn't make sense.
+            return;
           }
 
-          if (newNodesAdded != null) {
-            for (Listener listener : listeners) {
-              try {
-                listener.nodesAdded(newNodesAdded);
-              }
-              catch (Exception ex) {
-                log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, newNodesAdded);
-              }
+          Collection<DiscoveryDruidNode> unmodifiableNodesAdded = Collections.unmodifiableCollection(nodesAdded);
+          for (Listener listener : listeners) {
+            try {
+              listener.nodesAdded(unmodifiableNodesAdded);
+            }
+            catch (Exception ex) {
+              log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, nodesAdded);
             }
           }
         }
       }
 
       @Override
-      public void nodesRemoved(List<DiscoveryDruidNode> nodesDisappeared)
+      public void nodesRemoved(Collection<DiscoveryDruidNode> nodesDisappeared)
       {
         synchronized (lock) {
-          ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+          List<DiscoveryDruidNode> nodesRemoved = new ArrayList<>();
           for (DiscoveryDruidNode node : nodesDisappeared) {
             DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
             if (prev != null) {
-              builder.add(node);
+              nodesRemoved.add(node);
             } else {
               log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
             }
           }
 
-          if (uninitializedNodeTypeListeners.isEmpty()) {
-            ImmutableList<DiscoveryDruidNode> nodesRemoved = builder.build();
+          if (nodesRemoved.isEmpty()) {
+            // Don't bother listeners with an empty update, it doesn't make sense.
+            return;
+          }
+
+          Collection<DiscoveryDruidNode> unmodifiableNodesRemoved = Collections.unmodifiableCollection(nodesRemoved);
+          for (Listener listener : listeners) {
+            try {
+              listener.nodesRemoved(unmodifiableNodesRemoved);
+            }
+            catch (Exception ex) {
+              log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void nodeViewInitialized()
+      {
+        synchronized (lock) {
+          if (uninitializedNodeTypes == 0) {
+            log.error("Unexpected call of nodeViewInitialized()");
+            return;
+          }
+          uninitializedNodeTypes--;
+          if (uninitializedNodeTypes == 0) {
             for (Listener listener : listeners) {
               try {
-                listener.nodesRemoved(nodesRemoved);
+                listener.nodeViewInitialized();
               }
               catch (Exception ex) {
-                log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+                log.error(ex, "Listener[%s].nodeViewInitialized() threw exception. Ignored.", listener);
               }
             }
           }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
index f06ffd0..4c6efc0 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
@@ -61,33 +61,25 @@ public class AppenderatorDriverMetadata
       final Map<String, List<SegmentWithState>> newMetadata = new HashMap<>();
       final Set<String> activeSegmentsAlreadySeen = new HashSet<>(); // temp data structure
 
-      activeSegments.entrySet()
-                    .forEach(sequenceSegments -> newMetadata.put(
-                        sequenceSegments.getKey(),
-                        sequenceSegments.getValue()
-                                        .stream()
-                                        .map(segmentIdentifier -> {
-                                          activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
-                                          return SegmentWithState.newSegment(segmentIdentifier);
-                                        })
-                                        .collect(Collectors.toList())
-                    ));
+      activeSegments.forEach((String sequence, List<SegmentIdentifier> sequenceSegments) -> newMetadata.put(
+          sequence,
+          sequenceSegments
+              .stream()
+              .map(segmentIdentifier -> {
+                activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
+                return SegmentWithState.newSegment(segmentIdentifier);
+              })
+              .collect(Collectors.toList())
+      ));
       // publishPendingSegments is a superset of activeSegments
-      publishPendingSegments.entrySet()
-                            .forEach(sequenceSegments -> newMetadata.computeIfAbsent(
-                                sequenceSegments.getKey(),
-                                k -> new ArrayList<>()
-                            ).addAll(
-                                sequenceSegments.getValue()
-                                                .stream()
-                                                .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(
-                                                    segmentIdentifier.toString()))
-                                                .map(segmentIdentifier -> SegmentWithState.newSegment(
-                                                    segmentIdentifier,
-                                                    SegmentState.APPEND_FINISHED
-                                                ))
-                                                .collect(Collectors.toList())
-                            ));
+      publishPendingSegments.forEach((sequence, sequenceSegments) -> {
+        List<SegmentWithState> segmentWithStates = newMetadata.computeIfAbsent(sequence, seq -> new ArrayList<>());
+        sequenceSegments
+            .stream()
+            .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(segmentIdentifier.toString()))
+            .map(segmentIdentifier -> SegmentWithState.newSegment(segmentIdentifier, SegmentState.APPEND_FINISHED))
+            .forEach(segmentWithStates::add);
+      });
       this.segments = newMetadata;
     } else {
       this.segments = segments;
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
index 1ad59fb..40100fb 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
@@ -78,7 +78,7 @@ public class LookupNodeDiscovery
   {
     ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
 
-    druidNodeDiscovery.getAllNodes().stream().forEach(
+    druidNodeDiscovery.getAllNodes().forEach(
         node -> builder.add(((LookupNodeService) node.getServices()
                                                      .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
     );
diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index fcc3303..9263b35 100644
--- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -41,6 +41,7 @@ import org.apache.druid.server.coordinator.rules.Rule;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -131,7 +132,7 @@ public class TieredBrokerHostSelector<T>
           new DruidNodeDiscovery.Listener()
           {
             @Override
-            public void nodesAdded(List<DiscoveryDruidNode> nodes)
+            public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
             {
               nodes.forEach(
                   (node) -> {
@@ -144,7 +145,7 @@ public class TieredBrokerHostSelector<T>
             }
 
             @Override
-            public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+            public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
             {
               nodes.forEach(
                   (node) -> {
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 01097e6..be49011 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -281,6 +281,7 @@ public class HttpServerInventoryViewTest
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index e4f577e..fcf6b6b 100644
--- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -39,7 +39,6 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -129,13 +128,13 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             coordNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             coordNodes.removeAll(nodes);
           }
@@ -147,13 +146,13 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             overlordNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             overlordNodes.removeAll(nodes);
           }
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
index d361df5..2b5722d 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,13 +48,13 @@ public class DruidNodeDiscoveryProviderTest
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             dataNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             dataNodes.removeAll(nodes);
           }
@@ -66,13 +67,13 @@ public class DruidNodeDiscoveryProviderTest
         new DruidNodeDiscovery.Listener()
         {
           @Override
-          public void nodesAdded(List<DiscoveryDruidNode> nodes)
+          public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
           {
             lookupNodes.addAll(nodes);
           }
 
           @Override
-          public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+          public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
           {
             lookupNodes.removeAll(nodes);
           }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 2b4a8c5..0810a02 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -71,7 +71,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 /**
  * This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer
@@ -383,7 +382,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     // clean up drop from load queue
     curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getIdentifier()));
 
-    List<DruidServer> servers = serverView.getInventory().stream().collect(Collectors.toList());
+    List<DruidServer> servers = new ArrayList<>(serverView.getInventory());
 
     Assert.assertEquals(2, servers.get(0).getSegments().size());
     Assert.assertEquals(2, servers.get(1).getSegments().size());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index a854914..56ca7ec 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -185,6 +185,7 @@ public class HttpLoadQueuePeonTest
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
+      listener.nodeViewInitialized();
       this.listener = listener;
     }
   }
diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
index 2ef2b85..81f2c10 100644
--- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
+++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -103,6 +103,7 @@ public class TieredBrokerHostSelectorTest
       public void registerListener(Listener listener)
       {
         listener.nodesAdded(ImmutableList.of(node1, node2, node3));
+        listener.nodeViewInitialized();
       }
     };
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
index 3b4985c..7f0ab00 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
@@ -34,8 +34,8 @@ import org.apache.druid.sql.calcite.expression.TimeUnits;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
-import java.util.Arrays;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class CeilOperatorConversion implements SqlOperatorConversion
 {
@@ -80,14 +80,17 @@ public class CeilOperatorConversion implements SqlOperatorConversion
       // So there is no simple extraction for this operator.
       return DruidExpression.fromFunctionCall(
           "timestamp_ceil",
-          Arrays.asList(
-              druidExpression.getExpression(),
-              DruidExpression.stringLiteral(granularity.getPeriod().toString()),
-              DruidExpression.numberLiteral(
-                  granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
-              ),
-              DruidExpression.stringLiteral(granularity.getTimeZone().toString())
-          ).stream().map(DruidExpression::fromExpression).collect(Collectors.toList())
+          Stream
+              .of(
+                  druidExpression.getExpression(),
+                  DruidExpression.stringLiteral(granularity.getPeriod().toString()),
+                  DruidExpression.numberLiteral(
+                      granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
+                  ),
+                  DruidExpression.stringLiteral(granularity.getTimeZone().toString())
+              )
+              .map(DruidExpression::fromExpression)
+              .collect(Collectors.toList())
       );
     } else {
       // WTF? CEIL with 3 arguments?
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
index 5be0036..a3c35c0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
@@ -92,8 +92,8 @@ public class GroupByRules
     final List<Aggregation> existingAggregationsWithSameFilter = new ArrayList<>();
     for (Aggregation existingAggregation : existingAggregations) {
       if (filter == null) {
-        final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().allMatch(
-            factory -> !(factory instanceof FilteredAggregatorFactory)
+        final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().noneMatch(
+            factory -> factory instanceof FilteredAggregatorFactory
         );
 
         if (doesMatch) {
@@ -160,6 +160,6 @@ public class GroupByRules
         .map(AggregatorFactory::getName)
         .collect(Collectors.toSet());
 
-    return aggregation.getPostAggregator().getDependentFields().stream().allMatch(existingAggregationNames::contains);
+    return existingAggregationNames.containsAll(aggregation.getPostAggregator().getDependentFields());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org