You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2018/12/01 00:13:03 UTC
[incubator-druid] branch master updated: Simplify
DruidNodeDiscoveryProvider;
add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)
This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new ec38df7 Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)
ec38df7 is described below
commit ec38df75756b975461bf1234e26cd195c462107d
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Sat Dec 1 01:12:56 2018 +0100
Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() (#6606)
* Simplify DruidNodeDiscoveryProvider; add DruidNodeDiscovery.Listener.nodeViewInitialized() method; prohibit and eliminate some suboptimal Java 8 patterns
* Fix style
* Fix HttpEmitterTest.timeoutEmptyQueue()
* Add DruidNodeDiscovery.Listener.nodeViewInitialized() calls in tests
* Clarify code
---
.idea/inspectionProfiles/Druid.xml | 2 +
.../java/util/emitter/core/HttpPostEmitter.java | 31 +++---
.../data/input/impl/FileIteratingFirehoseTest.java | 3 +-
.../java/util/emitter/core/HttpEmitterTest.java | 12 ++-
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 7 +-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 16 +--
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 1 +
.../apache/druid/query/filter/AndDimFilter.java | 3 +-
.../org/apache/druid/query/filter/OrDimFilter.java | 3 +-
.../apache/druid/query/groupby/GroupByQuery.java | 2 +-
.../druid/segment/IndexMergerNullHandlingTest.java | 2 +-
.../druid/client/HttpServerInventoryView.java | 22 ++---
.../CuratorDruidNodeDiscoveryProvider.java | 82 ++++++++--------
.../apache/druid/discovery/DruidNodeDiscovery.java | 17 ++--
.../discovery/DruidNodeDiscoveryProvider.java | 109 ++++++++++++++-------
.../appenderator/AppenderatorDriverMetadata.java | 44 ++++-----
.../server/lookup/cache/LookupNodeDiscovery.java | 2 +-
.../server/router/TieredBrokerHostSelector.java | 5 +-
.../druid/client/HttpServerInventoryViewTest.java | 1 +
.../CuratorDruidNodeAnnouncerAndDiscoveryTest.java | 9 +-
.../discovery/DruidNodeDiscoveryProviderTest.java | 9 +-
.../coordinator/CuratorDruidCoordinatorTest.java | 3 +-
.../server/coordinator/HttpLoadQueuePeonTest.java | 1 +
.../router/TieredBrokerHostSelectorTest.java | 1 +
.../expression/builtin/CeilOperatorConversion.java | 21 ++--
.../druid/sql/calcite/rule/GroupByRules.java | 6 +-
26 files changed, 236 insertions(+), 178 deletions(-)
diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 6bb556d..dfedeb4 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -76,6 +76,7 @@
<inspection_tool class="InvalidComparatorMethodReference" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="IteratorHasNextCallsIteratorNext" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="IteratorNextDoesNotThrowNoSuchElementException" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="Java8MapForEach" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="JsonDuplicatePropertyKeys" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="JsonStandardCompliance" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="LengthOneStringInIndexOf" enabled="true" level="ERROR" enabled_by_default="true" />
@@ -229,6 +230,7 @@
<constraint name="E" within="" contains="" />
</searchConfiguration>
</inspection_tool>
+ <inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
index 8b486cb..d2da5a7 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
@@ -140,7 +140,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private final AtomicInteger allocatedBuffers = new AtomicInteger();
private final AtomicInteger droppedBuffers = new AtomicInteger();
- private volatile long lastFillTimeMillis;
+ private volatile long lastBatchFillTimeMillis;
private final ConcurrentTimeCounter batchFillingTimeCounter = new ConcurrentTimeCounter();
private final Object startLock = new Object();
@@ -180,8 +180,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
emittingThread = new EmittingThread(config);
long firstBatchNumber = 1;
concurrentBatch.set(new Batch(this, acquireBuffer(), firstBatchNumber));
- // lastFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
- lastFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
+ // lastBatchFillTimeMillis must not be 0, minHttpTimeoutMillis could be.
+ lastBatchFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1);
}
@Override
@@ -328,7 +328,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
if (elapsedTimeMillis > 0) {
// If elapsedTimeMillis is 0 or negative, it's likely because System.currentTimeMillis() is not monotonic, so not
// accounting this time for determining batch sending timeout.
- lastFillTimeMillis = elapsedTimeMillis;
+ lastBatchFillTimeMillis = elapsedTimeMillis;
}
addBatchToEmitQueue(batch);
wakeUpEmittingThread();
@@ -663,7 +663,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
*/
private boolean sendWithRetries(final byte[] buffer, final int length, final int eventCount, boolean withTimeout)
{
- long deadLineMillis = System.currentTimeMillis() + sendRequestTimeoutMillis(lastFillTimeMillis);
+ long deadLineMillis = System.currentTimeMillis() + computeTimeoutForSendRequestInMillis(lastBatchFillTimeMillis);
try {
RetryUtils.retry(
new RetryUtils.Task<Object>()
@@ -709,8 +709,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private void send(byte[] buffer, int length) throws Exception
{
- long lastFillTimeMillis = HttpPostEmitter.this.lastFillTimeMillis;
- final long timeoutMillis = sendRequestTimeoutMillis(lastFillTimeMillis);
+ long lastFillTimeMillis = HttpPostEmitter.this.lastBatchFillTimeMillis;
+ final long timeoutMillis = computeTimeoutForSendRequestInMillis(lastFillTimeMillis);
if (timeoutMillis < config.getMinHttpTimeoutMillis()) {
throw timeoutLessThanMinimumException;
}
@@ -795,18 +795,27 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
accountSuccessfulSending(sendingStartMs);
}
- private long sendRequestTimeoutMillis(long lastFillTimeMillis)
+ /**
+ * This method computes the timeout for sending a batch of events over HTTP, based on how much time it took to
+ * populate that batch. The idea is that if it took X milliseconds to fill the batch, we couldn't wait for more than
+ * X * {@link HttpEmitterConfig#httpTimeoutAllowanceFactor} milliseconds to send that data, because at the same time
+ * the next batch is probably being filled with the same speed, so we have to keep up with the speed.
+ *
+ * Ideally it should use something like moving average instead of plain last batch fill time in order to accomodate
+ * for emitting bursts, but it might unnecessary because Druid application might not produce events in bursts.
+ */
+ private long computeTimeoutForSendRequestInMillis(long lastBatchFillTimeMillis)
{
int emitQueueSize = approximateBuffersToEmitCount.get();
if (emitQueueSize < EMIT_QUEUE_THRESHOLD_1) {
- return (long) (lastFillTimeMillis * config.httpTimeoutAllowanceFactor);
+ return (long) (lastBatchFillTimeMillis * config.httpTimeoutAllowanceFactor);
}
if (emitQueueSize < EMIT_QUEUE_THRESHOLD_2) {
// The idea is to not let buffersToEmit queue to grow faster than we can emit buffers.
- return (long) (lastFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
+ return (long) (lastBatchFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
}
// If buffersToEmit still grows, try to restrict even more
- return (long) (lastFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
+ return (long) (lastBatchFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
}
private void accountSuccessfulSending(long sendingStartMs)
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
index 09f5ded..7683258 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/FileIteratingFirehoseTest.java
@@ -91,7 +91,8 @@ public class FileIteratingFirehoseTest
this.expectedResults = inputs.stream()
.map(input -> input.split("\n"))
.flatMap(lines -> {
- final List<String> filteredLines = Arrays.asList(lines).stream()
+ final List<String> filteredLines = Arrays
+ .stream(lines)
.filter(line -> line.length() > 0)
.map(line -> line.split(",")[1])
.collect(Collectors.toList());
diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
index 812e665..07acd4a 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java
@@ -24,6 +24,7 @@ import com.google.common.primitives.Ints;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -65,21 +66,26 @@ public class HttpEmitterTest
@Test
public void timeoutEmptyQueue() throws IOException, InterruptedException
{
+ float timeoutAllowanceFactor = 2.0f;
final HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
- .setHttpTimeoutAllowanceFactor(2.0f)
+ .setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
+ long startMs = System.currentTimeMillis();
emitter.start();
emitter.emitAndReturnBatch(new IntEvent());
emitter.flush();
- Assert.assertTrue(timeoutUsed.get() < 5);
+ long fillTimeMs = System.currentTimeMillis() - startMs;
+ Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
+ startMs = System.currentTimeMillis();
final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
Thread.sleep(1000);
batch.seal();
emitter.flush();
- Assert.assertTrue(timeoutUsed.get() >= 2000 && timeoutUsed.get() < 3000);
+ fillTimeMs = System.currentTimeMillis() - startMs;
+ Assert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
}
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 0a4dac7..75d2688 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -86,6 +86,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -454,9 +455,9 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
if (isPersistRequired) {
driver.persist(committerSupplier.get());
}
- segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut(
- sequenceSegments.getKey(),
- sequenceSegments.getValue().stream().collect(Collectors.toList())
+ segmentsToMoveOut.forEach((String sequence, Set<SegmentIdentifier> segments) -> driver.moveSegmentOut(
+ sequence,
+ new ArrayList<SegmentIdentifier>(segments)
));
}
catch (ParseException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index bf96b10..5cb99ba 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -443,18 +443,22 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
- nodes.stream().forEach(node -> addWorker(toWorker(node)));
+ nodes.forEach(node -> addWorker(toWorker(node)));
+ }
- //CountDownLatch.countDown() does nothing when count has already reached 0.
- workerViewInitialized.countDown();
+ @Override
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+ {
+ nodes.forEach(node -> removeWorker(toWorker(node)));
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodeViewInitialized()
{
- nodes.stream().forEach(node -> removeWorker(toWorker(node)));
+ //CountDownLatch.countDown() does nothing when count has already reached 0.
+ workerViewInitialized.countDown();
}
}
);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index a130883..25e172a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1380,6 +1380,7 @@ public class HttpRemoteTaskRunnerTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
+ listener.nodeViewInitialized();
this.listener = listener;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
index 9537ff8..9903633 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/AndDimFilter.java
@@ -102,8 +102,7 @@ public class AndDimFilter implements DimFilter
public HashSet<String> getRequiredColumns()
{
HashSet<String> requiredColumns = new HashSet<>();
- fields.stream()
- .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+ fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
return requiredColumns;
}
diff --git a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
index ce89431..d2ed4e7 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/OrDimFilter.java
@@ -110,8 +110,7 @@ public class OrDimFilter implements DimFilter
public HashSet<String> getRequiredColumns()
{
HashSet<String> requiredColumns = new HashSet<>();
- fields.stream()
- .forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
+ fields.forEach(field -> requiredColumns.addAll(field.getRequiredColumns()));
return requiredColumns;
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 1c7fb6c..8ab4601 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -737,7 +737,7 @@ public class GroupByQuery extends BaseQuery<Row>
private static Map<String, AggregatorFactory> getAggregatorsMap(List<AggregatorFactory> aggregatorSpecs)
{
Map<String, AggregatorFactory> map = new HashMap<>(aggregatorSpecs.size());
- aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v));
+ aggregatorSpecs.forEach(v -> map.put(v.getName(), v));
return map;
}
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
index 61dc753..f2f1cf0 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
@@ -119,7 +119,7 @@ public class IndexMergerNullHandlingTest
try (QueryableIndex index = indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null))) {
final ColumnHolder columnHolder = index.getColumnHolder("d");
- if (subsetList.stream().allMatch(nullFlavors::contains)) {
+ if (nullFlavors.containsAll(subsetList)) {
// all null -> should be missing
Assert.assertNull(subsetList.toString(), columnHolder);
} else {
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 855f582..bf0d123 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -152,23 +152,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
- nodes.forEach(
- node -> serverAdded(toDruidServer(node))
- );
+ nodes.forEach(node -> serverAdded(toDruidServer(node)));
+ }
- if (!initialized.getAndSet(true)) {
- executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
- }
+ @Override
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+ {
+ nodes.forEach(node -> serverRemoved(toDruidServer(node)));
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodeViewInitialized()
{
- nodes.forEach(
- node -> serverRemoved(toDruidServer(node))
- );
+ if (!initialized.getAndSet(true)) {
+ executor.execute(HttpServerInventoryView.this::serverInventoryInitialized);
+ }
}
private DruidServer toDruidServer(DiscoveryDruidNode node)
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 46b8ce9..7025319 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -156,6 +157,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
// hostAndPort -> DiscoveryDruidNode
private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+ private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
private final PathChildrenCache cache;
private final ExecutorService cacheExecutor;
@@ -166,7 +168,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final Object lock = new Object();
- private CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private final CountDownLatch cacheInitialized = new CountDownLatch(1);
NodeTypeWatcher(
ExecutorService listenerExecutor,
@@ -195,30 +197,39 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
- if (!isCacheInitialized(30, TimeUnit.SECONDS)) {
+ boolean nodeViewInitialized;
+ try {
+ nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ nodeViewInitialized = false;
+ }
+ if (!nodeViewInitialized) {
log.info("cache is not initialized yet. getAllNodes() might not return full information.");
}
- return Collections.unmodifiableCollection(nodes.values());
+ return unmodifiableNodes;
}
@Override
public void registerListener(DruidNodeDiscovery.Listener listener)
{
synchronized (lock) {
- if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
- ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
+ // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+ if (cacheInitialized.getCount() == 0) {
safeSchedule(
() -> {
- listener.nodesAdded(currNodes);
+ listener.nodesAdded(unmodifiableNodes);
+ listener.nodeViewInitialized();
},
- "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
+ "Exception occured in nodesAdded([%s]) in listener [%s].", unmodifiableNodes, listener
);
}
nodeListeners.add(listener);
}
}
- public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ void handleChildEvent(PathChildrenCacheEvent event)
{
synchronized (lock) {
try {
@@ -278,20 +289,24 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
break;
}
case INITIALIZED: {
- if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+ // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
+ // counted down.
+ if (cacheInitialized.getCount() == 0) {
log.warn("cache is already initialized. ignoring [%s] event.", event.getType());
return;
}
log.info("Received INITIALIZED in node watcher.");
- ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
- for (Listener l : nodeListeners) {
+ for (Listener listener : nodeListeners) {
safeSchedule(
() -> {
- l.nodesAdded(currNodes);
+ listener.nodesAdded(unmodifiableNodes);
+ listener.nodeViewInitialized();
},
- "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l
+ "Exception occured in nodesAdded([%s]) in listener [%s].",
+ unmodifiableNodes,
+ listener
);
}
@@ -309,17 +324,6 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
}
- private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit)
- {
- try {
- return cacheInitialized.await(waitFor, timeUnit);
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
{
listenerExecutor.submit(() -> {
@@ -332,20 +336,20 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
});
}
+ @GuardedBy("lock")
private void addNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
- if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+ // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+ if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
- for (Listener l : nodeListeners) {
+ for (Listener listener : nodeListeners) {
safeSchedule(
- () -> {
- l.nodesAdded(newNode);
- },
+ () -> listener.nodesAdded(newNode),
"Exception occured in nodeAdded(node=[%s]) in listener [%s].",
druidNode.getDruidNode().getHostAndPortToUse(),
- l
+ listener
);
}
}
@@ -359,6 +363,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
}
+ @GuardedBy("lock")
private void removeNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
@@ -372,14 +377,15 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return;
}
- if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
+ // No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
+ if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
- for (Listener l : nodeListeners) {
+ for (Listener listener : nodeListeners) {
safeSchedule(
- () -> {
- l.nodesRemoved(nodeRemoved);
- },
- "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode.getDruidNode().getHostAndPortToUse(), l
+ () -> listener.nodesRemoved(nodeRemoved),
+ "Exception occured in nodeRemoved(node=[%s]) in listener [%s].",
+ druidNode.getDruidNode().getHostAndPortToUse(),
+ listener
);
}
}
@@ -388,9 +394,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
public void start()
{
try {
- cache.getListenable().addListener(
- (client, event) -> handleChildEvent(client, event)
- );
+ cache.getListenable().addListener((client, event) -> handleChildEvent(event));
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
catch (Exception ex) {
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index 807440b..d9148c3 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -20,7 +20,6 @@
package org.apache.druid.discovery;
import java.util.Collection;
-import java.util.List;
/**
* Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
@@ -39,14 +38,16 @@ public interface DruidNodeDiscovery
*/
interface Listener
{
+ void nodesAdded(Collection<DiscoveryDruidNode> nodes);
+
+ void nodesRemoved(Collection<DiscoveryDruidNode> nodes);
+
/**
- * List of nodes added.
- * First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation
- * has been initialized.
- * @param nodes
+ * Called once when the underlying cache in the DruidNodeDiscovery implementation has been initialized.
*/
- void nodesAdded(List<DiscoveryDruidNode> nodes);
-
- void nodesRemoved(List<DiscoveryDruidNode> nodes);
+ default void nodeViewInitialized()
+ {
+ // do nothing
+ }
}
}
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 9ae34a3..79e73c1 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -19,7 +19,7 @@
package org.apache.druid.discovery;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
@@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,10 +65,11 @@ public abstract class DruidNodeDiscoveryProvider
if (nodeTypesToWatch == null) {
throw new IAE("Unknown service [%s].", service);
}
-
- ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service);
+ ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeTypesToWatch.size());
+ DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
+ serviceDiscovery.filteringUpstreamListener();
for (NodeType nodeType : nodeTypesToWatch) {
- getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
+ getForNodeType(nodeType).registerListener(filteringGatheringUpstreamListener);
}
return serviceDiscovery;
}
@@ -82,55 +82,66 @@ public abstract class DruidNodeDiscoveryProvider
private final String service;
private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
+ private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
private final List<Listener> listeners = new ArrayList<>();
private final Object lock = new Object();
- private Set<NodeTypeListener> uninitializedNodeTypeListeners = new HashSet<>();
+ private int uninitializedNodeTypes;
- ServiceDruidNodeDiscovery(String service)
+ ServiceDruidNodeDiscovery(String service, int watchedNodeTypes)
{
+ Preconditions.checkArgument(watchedNodeTypes > 0);
this.service = service;
+ this.uninitializedNodeTypes = watchedNodeTypes;
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
- return Collections.unmodifiableCollection(nodes.values());
+ return unmodifiableNodes;
}
@Override
public void registerListener(Listener listener)
{
+ if (listener instanceof FilteringUpstreamListener) {
+ throw new IAE("FilteringUpstreamListener should not be registered with ServiceDruidNodeDiscovery itself");
+ }
synchronized (lock) {
- if (uninitializedNodeTypeListeners.isEmpty()) {
- listener.nodesAdded(ImmutableList.copyOf(nodes.values()));
+ if (!unmodifiableNodes.isEmpty()) {
+ listener.nodesAdded(unmodifiableNodes);
+ }
+ if (uninitializedNodeTypes == 0) {
+ listener.nodeViewInitialized();
}
listeners.add(listener);
}
}
- NodeTypeListener nodeTypeListener()
+ DruidNodeDiscovery.Listener filteringUpstreamListener()
{
- NodeTypeListener nodeListener = new NodeTypeListener();
- uninitializedNodeTypeListeners.add(nodeListener);
- return nodeListener;
+ return new FilteringUpstreamListener();
}
- class NodeTypeListener implements DruidNodeDiscovery.Listener
+ /**
+ * Listens for all node updates and filters them based on {@link #service}. Note: this listener is registered with
+ * the objects returned from {@link #getForNodeType(NodeType)}, NOT with {@link ServiceDruidNodeDiscovery} itself.
+ */
+ class FilteringUpstreamListener implements DruidNodeDiscovery.Listener
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodesDiscovered)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodesDiscovered)
{
synchronized (lock) {
- ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+ List<DiscoveryDruidNode> nodesAdded = new ArrayList<>();
for (DiscoveryDruidNode node : nodesDiscovered) {
if (node.getServices().containsKey(service)) {
DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
if (prev == null) {
- builder.add(node);
+ nodesAdded.add(node);
} else {
log.warn("Node[%s] discovered but already exists [%s].", node, prev);
}
@@ -139,48 +150,70 @@ public abstract class DruidNodeDiscoveryProvider
}
}
- ImmutableList<DiscoveryDruidNode> newNodesAdded = null;
- if (uninitializedNodeTypeListeners.isEmpty()) {
- newNodesAdded = builder.build();
- } else if (uninitializedNodeTypeListeners.remove(this) && uninitializedNodeTypeListeners.isEmpty()) {
- newNodesAdded = ImmutableList.copyOf(nodes.values());
+ if (nodesAdded.isEmpty()) {
+ // Don't bother listeners with an empty update, it doesn't make sense.
+ return;
}
- if (newNodesAdded != null) {
- for (Listener listener : listeners) {
- try {
- listener.nodesAdded(newNodesAdded);
- }
- catch (Exception ex) {
- log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, newNodesAdded);
- }
+ Collection<DiscoveryDruidNode> unmodifiableNodesAdded = Collections.unmodifiableCollection(nodesAdded);
+ for (Listener listener : listeners) {
+ try {
+ listener.nodesAdded(unmodifiableNodesAdded);
+ }
+ catch (Exception ex) {
+ log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, nodesAdded);
}
}
}
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodesDisappeared)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodesDisappeared)
{
synchronized (lock) {
- ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
+ List<DiscoveryDruidNode> nodesRemoved = new ArrayList<>();
for (DiscoveryDruidNode node : nodesDisappeared) {
DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
if (prev != null) {
- builder.add(node);
+ nodesRemoved.add(node);
} else {
log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
}
}
- if (uninitializedNodeTypeListeners.isEmpty()) {
- ImmutableList<DiscoveryDruidNode> nodesRemoved = builder.build();
+ if (nodesRemoved.isEmpty()) {
+ // Don't bother listeners with an empty update, it doesn't make sense.
+ return;
+ }
+
+ Collection<DiscoveryDruidNode> unmodifiableNodesRemoved = Collections.unmodifiableCollection(nodesRemoved);
+ for (Listener listener : listeners) {
+ try {
+ listener.nodesRemoved(unmodifiableNodesRemoved);
+ }
+ catch (Exception ex) {
+ log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void nodeViewInitialized()
+ {
+ synchronized (lock) {
+ if (uninitializedNodeTypes == 0) {
+ log.error("Unexpected call of nodeViewInitialized()");
+ return;
+ }
+ uninitializedNodeTypes--;
+ if (uninitializedNodeTypes == 0) {
for (Listener listener : listeners) {
try {
- listener.nodesRemoved(nodesRemoved);
+ listener.nodeViewInitialized();
}
catch (Exception ex) {
- log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
+ log.error(ex, "Listener[%s].nodeViewInitialized() threw exception. Ignored.", listener);
}
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
index f06ffd0..4c6efc0 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java
@@ -61,33 +61,25 @@ public class AppenderatorDriverMetadata
final Map<String, List<SegmentWithState>> newMetadata = new HashMap<>();
final Set<String> activeSegmentsAlreadySeen = new HashSet<>(); // temp data structure
- activeSegments.entrySet()
- .forEach(sequenceSegments -> newMetadata.put(
- sequenceSegments.getKey(),
- sequenceSegments.getValue()
- .stream()
- .map(segmentIdentifier -> {
- activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
- return SegmentWithState.newSegment(segmentIdentifier);
- })
- .collect(Collectors.toList())
- ));
+ activeSegments.forEach((String sequence, List<SegmentIdentifier> sequenceSegments) -> newMetadata.put(
+ sequence,
+ sequenceSegments
+ .stream()
+ .map(segmentIdentifier -> {
+ activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
+ return SegmentWithState.newSegment(segmentIdentifier);
+ })
+ .collect(Collectors.toList())
+ ));
// publishPendingSegments is a superset of activeSegments
- publishPendingSegments.entrySet()
- .forEach(sequenceSegments -> newMetadata.computeIfAbsent(
- sequenceSegments.getKey(),
- k -> new ArrayList<>()
- ).addAll(
- sequenceSegments.getValue()
- .stream()
- .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(
- segmentIdentifier.toString()))
- .map(segmentIdentifier -> SegmentWithState.newSegment(
- segmentIdentifier,
- SegmentState.APPEND_FINISHED
- ))
- .collect(Collectors.toList())
- ));
+ publishPendingSegments.forEach((sequence, sequenceSegments) -> {
+ List<SegmentWithState> segmentWithStates = newMetadata.computeIfAbsent(sequence, seq -> new ArrayList<>());
+ sequenceSegments
+ .stream()
+ .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(segmentIdentifier.toString()))
+ .map(segmentIdentifier -> SegmentWithState.newSegment(segmentIdentifier, SegmentState.APPEND_FINISHED))
+ .forEach(segmentWithStates::add);
+ });
this.segments = newMetadata;
} else {
this.segments = segments;
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
index 1ad59fb..40100fb 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
@@ -78,7 +78,7 @@ public class LookupNodeDiscovery
{
ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
- druidNodeDiscovery.getAllNodes().stream().forEach(
+ druidNodeDiscovery.getAllNodes().forEach(
node -> builder.add(((LookupNodeService) node.getServices()
.get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
);
diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index fcc3303..9263b35 100644
--- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -41,6 +41,7 @@ import org.apache.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -131,7 +132,7 @@ public class TieredBrokerHostSelector<T>
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
nodes.forEach(
(node) -> {
@@ -144,7 +145,7 @@ public class TieredBrokerHostSelector<T>
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
nodes.forEach(
(node) -> {
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 01097e6..be49011 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -281,6 +281,7 @@ public class HttpServerInventoryViewTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
+ listener.nodeViewInitialized();
this.listener = listener;
}
}
diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index e4f577e..fcf6b6b 100644
--- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -39,7 +39,6 @@ import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
/**
@@ -129,13 +128,13 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
coordNodes.addAll(nodes);
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
coordNodes.removeAll(nodes);
}
@@ -147,13 +146,13 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
overlordNodes.addAll(nodes);
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
overlordNodes.removeAll(nodes);
}
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
index d361df5..2b5722d 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -47,13 +48,13 @@ public class DruidNodeDiscoveryProviderTest
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
dataNodes.addAll(nodes);
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
dataNodes.removeAll(nodes);
}
@@ -66,13 +67,13 @@ public class DruidNodeDiscoveryProviderTest
new DruidNodeDiscovery.Listener()
{
@Override
- public void nodesAdded(List<DiscoveryDruidNode> nodes)
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
lookupNodes.addAll(nodes);
}
@Override
- public void nodesRemoved(List<DiscoveryDruidNode> nodes)
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
lookupNodes.removeAll(nodes);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 2b4a8c5..0810a02 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -71,7 +71,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
/**
* This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer
@@ -383,7 +382,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
// clean up drop from load queue
curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getIdentifier()));
- List<DruidServer> servers = serverView.getInventory().stream().collect(Collectors.toList());
+ List<DruidServer> servers = new ArrayList<>(serverView.getInventory());
Assert.assertEquals(2, servers.get(0).getSegments().size());
Assert.assertEquals(2, servers.get(1).getSegments().size());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index a854914..56ca7ec 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -185,6 +185,7 @@ public class HttpLoadQueuePeonTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
+ listener.nodeViewInitialized();
this.listener = listener;
}
}
diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
index 2ef2b85..81f2c10 100644
--- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
+++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -103,6 +103,7 @@ public class TieredBrokerHostSelectorTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of(node1, node2, node3));
+ listener.nodeViewInitialized();
}
};
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
index 3b4985c..7f0ab00 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/CeilOperatorConversion.java
@@ -34,8 +34,8 @@ import org.apache.druid.sql.calcite.expression.TimeUnits;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
-import java.util.Arrays;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class CeilOperatorConversion implements SqlOperatorConversion
{
@@ -80,14 +80,17 @@ public class CeilOperatorConversion implements SqlOperatorConversion
// So there is no simple extraction for this operator.
return DruidExpression.fromFunctionCall(
"timestamp_ceil",
- Arrays.asList(
- druidExpression.getExpression(),
- DruidExpression.stringLiteral(granularity.getPeriod().toString()),
- DruidExpression.numberLiteral(
- granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
- ),
- DruidExpression.stringLiteral(granularity.getTimeZone().toString())
- ).stream().map(DruidExpression::fromExpression).collect(Collectors.toList())
+ Stream
+ .of(
+ druidExpression.getExpression(),
+ DruidExpression.stringLiteral(granularity.getPeriod().toString()),
+ DruidExpression.numberLiteral(
+ granularity.getOrigin() == null ? null : granularity.getOrigin().getMillis()
+ ),
+ DruidExpression.stringLiteral(granularity.getTimeZone().toString())
+ )
+ .map(DruidExpression::fromExpression)
+ .collect(Collectors.toList())
);
} else {
// WTF? CEIL with 3 arguments?
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
index 5be0036..a3c35c0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
@@ -92,8 +92,8 @@ public class GroupByRules
final List<Aggregation> existingAggregationsWithSameFilter = new ArrayList<>();
for (Aggregation existingAggregation : existingAggregations) {
if (filter == null) {
- final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().allMatch(
- factory -> !(factory instanceof FilteredAggregatorFactory)
+ final boolean doesMatch = existingAggregation.getAggregatorFactories().stream().noneMatch(
+ factory -> factory instanceof FilteredAggregatorFactory
);
if (doesMatch) {
@@ -160,6 +160,6 @@ public class GroupByRules
.map(AggregatorFactory::getName)
.collect(Collectors.toSet());
- return aggregation.getPostAggregator().getDependentFields().stream().allMatch(existingAggregationNames::contains);
+ return existingAggregationNames.containsAll(aggregation.getPostAggregator().getDependentFields());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org