You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/06/09 03:16:17 UTC
[druid] branch master updated: Load broadcast datasources on broker
and tasks (#9971)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 771870a Load broadcast datasources on broker and tasks (#9971)
771870a is described below
commit 771870ae2d312d643e6d98f3d0af8a9618af9681
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Mon Jun 8 20:15:59 2020 -0700
Load broadcast datasources on broker and tasks (#9971)
* Load broadcast datasources on broker and tasks
* Add javadocs
* Support HTTP segment management
* Fix indexer maxSize
* inspection fix
* Make segment cache optional on non-historicals
* Fix build
* Fix inspections, some coverage, failed tests
* More tests
* Add CliIndexer to MainTest
* Fix inspection
* Rename UnprunedDataSegment to LoadableDataSegment
* Address PR comments
* Fix
---
.../druid/indexing/kafka/KafkaIndexTask.java | 6 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../druid/indexing/kinesis/KinesisIndexTask.java | 6 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 2 +-
.../druid/indexing/common/task/AbstractTask.java | 6 +
.../task/AppenderatorDriverRealtimeIndexTask.java | 6 +
.../indexing/common/task/RealtimeIndexTask.java | 6 +
.../apache/druid/indexing/common/task/Task.java | 5 +
.../druid/indexing/overlord/ForkingTaskRunner.java | 7 +
.../AppenderatorDriverRealtimeIndexTaskTest.java | 1 +
.../druid/indexing/common/task/IndexTaskTest.java | 2 +
.../common/task/RealtimeIndexTaskTest.java | 6 +
server/pom.xml | 4 -
.../druid/client/CachingClusteredClient.java | 4 +-
.../java/org/apache/druid/client/DruidServer.java | 14 +-
.../discovery/DruidNodeDiscoveryProvider.java | 3 +-
.../druid/segment/loading/SegmentLoaderConfig.java | 5 +-
.../loading/SegmentLoaderLocalCacheManager.java | 1 -
...DataSegmentServerAnnouncerLifecycleHandler.java | 104 -----------
.../CoordinatorBasedSegmentHandoffNotifier.java | 2 +-
.../server/coordination/DruidServerMetadata.java | 12 +-
.../server/coordination/LoadableDataSegment.java | 81 +++++++++
.../coordination/SegmentChangeRequestLoad.java | 14 +-
.../coordination/SegmentLoadDropHandler.java | 28 ++-
.../druid/server/coordination/ServerType.java | 8 +
.../druid/server/coordinator/BalancerStrategy.java | 9 +-
.../CachingCostBalancerStrategyFactory.java | 6 +-
.../server/coordinator/CostBalancerStrategy.java | 8 +-
.../druid/server/coordinator/DruidCluster.java | 45 ++++-
.../druid/server/coordinator/DruidCoordinator.java | 2 +-
.../coordinator/DruidCoordinatorRuntimeParams.java | 23 ++-
.../server/coordinator/RandomBalancerStrategy.java | 5 +-
.../coordinator/ReservoirSegmentSampler.java | 16 +-
.../druid/server/coordinator/ServerHolder.java | 5 +
.../server/coordinator/duty/BalanceSegments.java | 5 +-
.../druid/server/coordinator/duty/RunRules.java | 14 +-
.../rules/BroadcastDistributionRule.java | 52 +++---
.../rules/ForeverBroadcastDistributionRule.java | 21 +--
.../rules/IntervalBroadcastDistributionRule.java | 28 +--
.../rules/PeriodBroadcastDistributionRule.java | 29 +---
.../druid/server/http/DataSourcesResource.java | 2 +-
...CoordinatorBasedSegmentHandoffNotifierTest.java | 2 +-
.../coordination/SegmentLoadDropHandlerTest.java | 193 ++++++++++++++++-----
.../server/coordination/ZkCoordinatorTest.java | 68 +++++++-
.../server/coordinator/BalanceSegmentsTest.java | 45 ++++-
.../server/coordinator/DruidClusterBuilder.java | 9 +-
.../coordinator/ReservoirSegmentSamplerTest.java | 7 +-
.../rules/BroadcastDistributionRuleSerdeTest.java | 19 +-
.../rules/BroadcastDistributionRuleTest.java | 18 +-
.../main/java/org/apache/druid/cli/CliBroker.java | 17 +-
.../main/java/org/apache/druid/cli/CliIndexer.java | 20 ++-
.../main/java/org/apache/druid/cli/CliPeon.java | 32 ++--
.../test/java/org/apache/druid/cli/MainTest.java | 4 +-
.../druid/sql/calcite/schema/DruidSchema.java | 6 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 2 +-
55 files changed, 699 insertions(+), 347 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 9f7f997..6bf4086 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -191,4 +191,10 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
{
return TYPE;
}
+
+ @Override
+ public boolean supportsQueries()
+ {
+ return true;
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 46a6a04..eb41749 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -345,6 +345,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
INPUT_FORMAT
)
);
+ Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index bfd3758..9eee3bf 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -137,6 +137,12 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
return TYPE;
}
+ @Override
+ public boolean supportsQueries()
+ {
+ return true;
+ }
+
@VisibleForTesting
AWSCredentialsConfig getAwsCredentialsConfig()
{
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 142f81f..48ca434 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -347,8 +347,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
false
)
-
);
+ Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 40745bf..728f3de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -146,6 +146,12 @@ public abstract class AbstractTask implements Task
}
@Override
+ public boolean supportsQueries()
+ {
+ return false;
+ }
+
+ @Override
public String getClasspathPrefix()
{
return null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c96b7f5..9a786fa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -246,6 +246,12 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
@Override
+ public boolean supportsQueries()
+ {
+ return true;
+ }
+
+ @Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index ed2ddd2..055a3fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -197,6 +197,12 @@ public class RealtimeIndexTask extends AbstractTask
}
@Override
+ public boolean supportsQueries()
+ {
+ return true;
+ }
+
+ @Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index c069c73..4f18c81 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -147,6 +147,11 @@ public interface Task
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
+ * @return true if this Task type is queryable, such as streaming ingestion tasks
+ */
+ boolean supportsQueries();
+
+ /**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
*/
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 2f1abc1..10f5e6b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -327,6 +327,13 @@ public class ForkingTaskRunner
command.add(nodeType);
}
+ // If the task type is queryable, we need to load broadcast segments on the peon, used for
+ // join queries
+ if (task.supportsQueries()) {
+ command.add("--loadBroadcastSegments");
+ command.add("true");
+ }
+
if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index de3fa29..513a590 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -337,6 +337,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
{
expectPublishedSegments(1);
final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null);
+ Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> statusFuture = runTask(task);
// Wait for firehose to show up, it starts off null.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 8668a93..6e3fcf5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -229,6 +229,8 @@ public class IndexTaskTest extends IngestionTestBase
appenderatorsManager
);
+ Assert.assertFalse(indexTask.supportsQueries());
+
final List<DataSegment> segments = runTask(indexTask).rhs;
Assert.assertEquals(2, segments.size());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 8cc21d0..12ea214 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -189,6 +189,12 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
}
+ @Test(timeout = 60_000L)
+ public void testSupportsQueries()
+ {
+ final RealtimeIndexTask task = makeRealtimeTask(null);
+ Assert.assertTrue(task.supportsQueries());
+ }
@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception
diff --git a/server/pom.xml b/server/pom.xml
index 5d68a91..ddce092 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -292,10 +292,6 @@
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
- <groupId>org.hibernate</groupId>
- <artifactId>hibernate-validator</artifactId>
- </dependency>
- <dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index ae4dc16..c382be3 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -452,7 +452,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) {
- if (!p.getServer().pick().getServer().segmentReplicatable()) {
+ if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
hasOnlyHistoricalSegments = false;
break;
}
@@ -633,7 +633,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
if (isBySegment) {
serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
- } else if (!server.segmentReplicatable() || !populateCache) {
+ } else if (!server.isSegmentReplicationTarget() || !populateCache) {
serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
} else {
serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index ddcba54..6c52866 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -137,9 +137,19 @@ public class DruidServer implements Comparable<DruidServer>
return metadata.getTier();
}
- public boolean segmentReplicatable()
+ public boolean isSegmentReplicationTarget()
{
- return metadata.segmentReplicatable();
+ return metadata.isSegmentReplicationTarget();
+ }
+
+ public boolean isSegmentBroadcastTarget()
+ {
+ return metadata.isSegmentBroadcastTarget();
+ }
+
+ public boolean isSegmentReplicationOrBroadcastTarget()
+ {
+ return metadata.isSegmentReplicationTarget() || metadata.isSegmentBroadcastTarget();
}
@JsonProperty
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 7332029..898ce7c 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -44,7 +44,8 @@ public abstract class DruidNodeDiscoveryProvider
private static final Map<String, Set<NodeRole>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY,
ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
- DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
+ DataNodeService.DISCOVERY_SERVICE_KEY,
+ ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER, NodeRole.BROKER),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index c6c5723..39b3bde 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -23,9 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.utils.JvmUtils;
-import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -34,8 +34,7 @@ import java.util.concurrent.TimeUnit;
public class SegmentLoaderConfig
{
@JsonProperty
- @NotEmpty
- private List<StorageLocationConfig> locations = null;
+ private List<StorageLocationConfig> locations = Collections.emptyList();
@JsonProperty("lazyLoadOnStart")
private boolean lazyLoadOnStart = false;
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 398ad67..b2ac7e8 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -89,7 +89,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;
-
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java b/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
deleted file mode 100644
index e874a30..0000000
--- a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import com.google.common.base.Throwables;
-import com.google.inject.Inject;
-import org.apache.druid.concurrent.LifecycleLock;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
-
-import java.io.IOException;
-
-/**
- * Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the lifecycle start and stop.
- *
- * Analogous to {@link org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals,
- * but without segment cache management.
- */
-@ManageLifecycle
-public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler
-{
- private static final EmittingLogger LOG = new EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
-
- private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer;
-
- private final LifecycleLock lifecycleLock = new LifecycleLock();
-
- @Inject
- public CliIndexerDataSegmentServerAnnouncerLifecycleHandler(
- DataSegmentServerAnnouncer dataSegmentServerAnnouncer
- )
- {
- this.dataSegmentServerAnnouncer = dataSegmentServerAnnouncer;
- }
-
- @LifecycleStart
- public void start() throws IOException
- {
- if (!lifecycleLock.canStart()) {
- throw new RuntimeException("Lifecycle lock could not start");
- }
-
- try {
- if (lifecycleLock.isStarted()) {
- return;
- }
-
- LOG.info("Starting...");
- try {
- dataSegmentServerAnnouncer.announce();
- }
- catch (Exception e) {
- Throwables.propagateIfPossible(e, IOException.class);
- throw new RuntimeException(e);
- }
- LOG.info("Started.");
- lifecycleLock.started();
- }
- finally {
- lifecycleLock.exitStart();
- }
- }
-
- @LifecycleStop
- public void stop()
- {
- if (!lifecycleLock.canStop()) {
- throw new RuntimeException("Lifecycle lock could not stop");
- }
-
- if (!lifecycleLock.isStarted()) {
- return;
- }
-
- LOG.info("Stopping...");
- try {
- dataSegmentServerAnnouncer.unannounce();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- LOG.info("Stopped.");
- }
-}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index bbee720..2e97258 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -142,7 +142,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
- && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) {
+ && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) {
return true;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
index e3673bb..3fda41b 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
@@ -107,11 +107,21 @@ public class DruidServerMetadata
return priority;
}
- public boolean segmentReplicatable()
+ public boolean isSegmentReplicationTarget()
{
return type.isSegmentReplicationTarget();
}
+ public boolean isSegmentBroadcastTarget()
+ {
+ return type.isSegmentBroadcastTarget();
+ }
+
+ public boolean isSegmentReplicationOrBroadcastTarget()
+ {
+ return isSegmentReplicationTarget() || isSegmentBroadcastTarget();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
new file mode 100644
index 0000000..4f4f7a5
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.jackson.CommaListJoinDeserializer;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A deserialization aid used by {@link SegmentChangeRequestLoad}. The broker prunes the loadSpec from segments
+ * for efficiency reasons, but the broker does need the loadSpec when it loads broadcast segments.
+ *
+ * This class always uses the non-pruning default {@link PruneSpecsHolder}.
+ */
+public class LoadableDataSegment extends DataSegment
+{
+ @JsonCreator
+ public LoadableDataSegment(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("version") String version,
+ // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution
+ @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
+ @JsonProperty("dimensions")
+ @JsonDeserialize(using = CommaListJoinDeserializer.class)
+ @Nullable
+ List<String> dimensions,
+ @JsonProperty("metrics")
+ @JsonDeserialize(using = CommaListJoinDeserializer.class)
+ @Nullable
+ List<String> metrics,
+ @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
+ @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
+ @JsonProperty("binaryVersion") Integer binaryVersion,
+ @JsonProperty("size") long size,
+ @JacksonInject PruneSpecsHolder pruneSpecsHolder
+ )
+ {
+ super(
+ dataSource,
+ interval,
+ version,
+ loadSpec,
+ dimensions,
+ metrics,
+ shardSpec,
+ lastCompactionState,
+ binaryVersion,
+ size,
+ PruneSpecsHolder.DEFAULT
+ );
+
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 097e025..130c7b5 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -35,14 +35,26 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
{
private final DataSegment segment;
+ /**
+ * To avoid pruning of the loadSpec on the broker, needed when the broker is loading broadcast segments,
+ * we deserialize into an {@link LoadableDataSegment}, which never removes the loadSpec.
+ */
@JsonCreator
public SegmentChangeRequestLoad(
- @JsonUnwrapped DataSegment segment
+ @JsonUnwrapped LoadableDataSegment segment
)
{
this.segment = segment;
}
+ public SegmentChangeRequestLoad(
+ DataSegment segment
+ )
+ {
+ this.segment = segment;
+ }
+
+
@Override
public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 51aefc1..87a1936 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -34,6 +34,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -104,7 +106,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
- SegmentManager segmentManager
+ SegmentManager segmentManager,
+ ServerTypeConfig serverTypeConfig
)
{
this(
@@ -116,7 +119,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
- )
+ ),
+ serverTypeConfig
);
}
@@ -127,7 +131,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
- ScheduledExecutorService exec
+ ScheduledExecutorService exec,
+ ServerTypeConfig serverTypeConfig
)
{
this.jsonMapper = jsonMapper;
@@ -139,6 +144,13 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
this.exec = exec;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
+ if (config.getLocations().isEmpty()) {
+ if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) {
+ throw new IAE("Segment cache locations must be set on historicals.");
+ } else {
+ log.info("Not starting SegmentLoadDropHandler with empty segment cache locations.");
+ }
+ }
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@@ -152,8 +164,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Starting...");
try {
- loadLocalCache();
- serverAnnouncer.announce();
+ if (!config.getLocations().isEmpty()) {
+ loadLocalCache();
+ serverAnnouncer.announce();
+ }
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
@@ -174,7 +188,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Stopping...");
try {
- serverAnnouncer.unannounce();
+ if (!config.getLocations().isEmpty()) {
+ serverAnnouncer.unannounce();
+ }
}
catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
index 42fb65a..0b860a1 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
@@ -63,6 +63,14 @@ public enum ServerType
{
return false;
}
+ },
+
+ BROKER {
+ @Override
+ public boolean isSegmentReplicationTarget()
+ {
+ return false;
+ }
};
/**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index d9fea81..889c167 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
/**
* This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
@@ -56,11 +57,17 @@ public interface BalancerStrategy
/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
+ * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
+ * Balancing strategies should avoid rebalancing segments for such datasources, since
+ * they should be loaded on all servers anyway.
+ * NOTE: this should really be handled on a per-segment basis, to properly support
+ * the interval or period-based broadcast rules. For simplicity of the initial
+ * implementation, only forever broadcast rules are supported.
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
- BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);
+ BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources);
/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
index 4a1989d..1741087 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
@@ -71,7 +71,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
- if (server.segmentReplicatable()) {
+ if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.addSegment(server.getName(), segment);
}
return ServerView.CallbackAction.CONTINUE;
@@ -80,7 +80,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
@Override
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
- if (server.segmentReplicatable()) {
+ if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeSegment(server.getName(), segment);
}
return ServerView.CallbackAction.CONTINUE;
@@ -98,7 +98,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
serverInventoryView.registerServerRemovedCallback(
executor,
server -> {
- if (server.segmentReplicatable()) {
+ if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeServer(server.getName());
}
return ServerView.CallbackAction.CONTINUE;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 4fd4164..5d656d6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -35,6 +35,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -211,9 +212,12 @@ public class CostBalancerStrategy implements BalancerStrategy
@Override
- public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
+ public BalancerSegmentHolder pickSegmentToMove(
+ final List<ServerHolder> serverHolders,
+ Set<String> broadcastDatasources
+ )
{
- return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
+ return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
}
@Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 318f663..8fb4ccb 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -47,24 +47,28 @@ public class DruidCluster
@VisibleForTesting
static DruidCluster createDruidClusterFromBuilderInTest(
@Nullable Set<ServerHolder> realtimes,
- Map<String, Iterable<ServerHolder>> historicals
+ Map<String, Iterable<ServerHolder>> historicals,
+ @Nullable Set<ServerHolder> brokers
)
{
- return new DruidCluster(realtimes, historicals);
+ return new DruidCluster(realtimes, historicals, brokers);
}
private final Set<ServerHolder> realtimes;
private final Map<String, NavigableSet<ServerHolder>> historicals;
+ private final Set<ServerHolder> brokers;
public DruidCluster()
{
this.realtimes = new HashSet<>();
this.historicals = new HashMap<>();
+ this.brokers = new HashSet<>();
}
private DruidCluster(
@Nullable Set<ServerHolder> realtimes,
- Map<String, Iterable<ServerHolder>> historicals
+ Map<String, Iterable<ServerHolder>> historicals,
+ @Nullable Set<ServerHolder> brokers
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
@@ -72,6 +76,7 @@ public class DruidCluster
historicals,
holders -> CollectionUtils.newTreeSet(Comparator.reverseOrder(), holders)
);
+ this.brokers = brokers == null ? new HashSet<>() : new HashSet<>(brokers);
}
public void add(ServerHolder serverHolder)
@@ -87,7 +92,11 @@ public class DruidCluster
addHistorical(serverHolder);
break;
case INDEXER_EXECUTOR:
- throw new IAE("unsupported server type[%s]", serverHolder.getServer().getType());
+ addRealtime(serverHolder);
+ break;
+ case BROKER:
+ addBroker(serverHolder);
+ break;
default:
throw new IAE("unknown server type[%s]", serverHolder.getServer().getType());
}
@@ -108,6 +117,11 @@ public class DruidCluster
tierServers.add(serverHolder);
}
+ private void addBroker(ServerHolder serverHolder)
+ {
+ brokers.add(serverHolder);
+ }
+
public Set<ServerHolder> getRealtimes()
{
return realtimes;
@@ -118,6 +132,12 @@ public class DruidCluster
return historicals;
}
+
+ public Set<ServerHolder> getBrokers()
+ {
+ return brokers;
+ }
+
public Iterable<String> getTierNames()
{
return historicals.keySet();
@@ -135,6 +155,7 @@ public class DruidCluster
final List<ServerHolder> allServers = new ArrayList<>(historicalSize + realtimeSize);
historicals.values().forEach(allServers::addAll);
+ allServers.addAll(brokers);
allServers.addAll(realtimes);
return allServers;
}
@@ -146,7 +167,7 @@ public class DruidCluster
public boolean isEmpty()
{
- return historicals.isEmpty() && realtimes.isEmpty();
+ return historicals.isEmpty() && realtimes.isEmpty() && brokers.isEmpty();
}
public boolean hasHistoricals()
@@ -159,9 +180,19 @@ public class DruidCluster
return !realtimes.isEmpty();
}
+ public boolean hasBrokers()
+ {
+ return !brokers.isEmpty();
+ }
+
public boolean hasTier(String tier)
{
- NavigableSet<ServerHolder> servers = historicals.get(tier);
- return (servers != null) && !servers.isEmpty();
+ NavigableSet<ServerHolder> historicalServers = historicals.get(tier);
+ boolean historicalsHasTier = (historicalServers != null) && !historicalServers.isEmpty();
+ if (historicalsHasTier) {
+ return true;
+ }
+
+ return false;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index f8c3f43..36a414e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -761,7 +761,7 @@ public class DruidCoordinator
List<ImmutableDruidServer> currentServers = serverInventoryView
.getInventory()
.stream()
- .filter(DruidServer::segmentReplicatable)
+ .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 7bae11e..3337b8e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -34,7 +34,9 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -70,6 +72,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;
+ private final Set<String> broadcastDatasources;
private DruidCoordinatorRuntimeParams(
long startTimeNanos,
@@ -85,7 +88,8 @@ public class DruidCoordinatorRuntimeParams
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
- BalancerStrategy balancerStrategy
+ BalancerStrategy balancerStrategy,
+ Set<String> broadcastDatasources
)
{
this.startTimeNanos = startTimeNanos;
@@ -102,6 +106,7 @@ public class DruidCoordinatorRuntimeParams
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
+ this.broadcastDatasources = broadcastDatasources;
}
public long getStartTimeNanos()
@@ -180,6 +185,11 @@ public class DruidCoordinatorRuntimeParams
return balancerStrategy;
}
+ public Set<String> getBroadcastDatasources()
+ {
+ return broadcastDatasources;
+ }
+
public boolean coordinatorIsLeadingEnoughTimeToMarkAsUnusedOvershadowedSegements()
{
long nanosElapsedSinceCoordinatorStart = System.nanoTime() - getStartTimeNanos();
@@ -256,6 +266,7 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategy balancerStrategy;
+ private Set<String> broadcastDatasources;
private Builder()
{
@@ -272,6 +283,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.balancerReferenceTimestamp = DateTimes.nowUtc();
+ this.broadcastDatasources = new HashSet<>();
}
Builder(
@@ -324,7 +336,8 @@ public class DruidCoordinatorRuntimeParams
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
- balancerStrategy
+ balancerStrategy,
+ broadcastDatasources
);
}
@@ -436,5 +449,11 @@ public class DruidCoordinatorRuntimeParams
this.balancerStrategy = balancerStrategy;
return this;
}
+
+ public Builder withBroadcastDatasources(Set<String> broadcastDatasources)
+ {
+ this.broadcastDatasources = broadcastDatasources;
+ return this;
+ }
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
index 8b0b306..72fdedf 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
public class RandomBalancerStrategy implements BalancerStrategy
@@ -51,9 +52,9 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
- public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+ public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
{
- return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
+ return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
}
@Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
index c2c4a7a..7181d52 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
@@ -22,19 +22,33 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
final class ReservoirSegmentSampler
{
- static BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
+ static BalancerSegmentHolder getRandomBalancerSegmentHolder(
+ final List<ServerHolder> serverHolders,
+ Set<String> broadcastDatasources
+ )
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
+ if (!server.getServer().getType().isSegmentReplicationTarget()) {
+ // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
+ continue;
+ }
+
for (DataSegment segment : server.getServer().iterateAllSegments()) {
+ if (broadcastDatasources.contains(segment.getDataSource())) {
+ // we don't need to rebalance segments that were assigned via broadcast rules
+ continue;
+ }
+
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index ba96566..26fa9a5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -122,6 +122,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return peon.getSegmentsToLoad().contains(segment);
}
+ public boolean isDroppingSegment(DataSegment segment)
+ {
+ return peon.getSegmentsToDrop().contains(segment);
+ }
+
public int getNumberOfSegmentsInQueue()
{
return peon.getNumberOfSegmentsInQueue();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index d42ca63..a1c5237 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -187,7 +187,10 @@ public class BalanceSegments implements CoordinatorDuty
//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
- final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
+ final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(
+ toMoveFrom,
+ params.getBroadcastDatasources()
+ );
if (segmentToMoveHolder == null) {
log.info("All servers to move segments from are empty, ending run.");
break;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
index 5288bb3..3dc7b4d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
@@ -28,11 +28,13 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -101,6 +103,7 @@ public class RunRules implements CoordinatorDuty
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
+ final Set<String> broadcastDatasources = new HashSet<>();
for (DataSegment segment : params.getUsedSegments()) {
if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments
@@ -112,6 +115,12 @@ public class RunRules implements CoordinatorDuty
if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
foundMatchingRule = true;
+
+ // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
+ // executes before BalanceSegments
+ if (rule instanceof BroadcastDistributionRule) {
+ broadcastDatasources.add(segment.getDataSource());
+ }
break;
}
}
@@ -131,6 +140,9 @@ public class RunRules implements CoordinatorDuty
.emit();
}
- return params.buildFromExisting().withCoordinatorStats(stats).build();
+ return params.buildFromExisting()
+ .withCoordinatorStats(stats)
+ .withBroadcastDatasources(broadcastDatasources)
+ .build();
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
index 6581712..35ff39e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.rules;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -27,8 +28,8 @@ import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
public abstract class BroadcastDistributionRule implements Rule
{
@@ -37,30 +38,35 @@ public abstract class BroadcastDistributionRule implements Rule
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
{
- // Find servers which holds the segments of co-located data source
- final Set<ServerHolder> loadServerHolders = new HashSet<>();
final Set<ServerHolder> dropServerHolders = new HashSet<>();
- final List<String> colocatedDataSources = getColocatedDataSources();
- if (colocatedDataSources == null || colocatedDataSources.isEmpty()) {
- loadServerHolders.addAll(params.getDruidCluster().getAllServers());
- } else {
- params.getDruidCluster().getAllServers().forEach(
- eachHolder -> {
- if (!eachHolder.isDecommissioning()
- && colocatedDataSources.stream()
- .anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
- loadServerHolders.add(eachHolder);
- } else if (eachHolder.isServingSegment(segment)) {
- if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) {
- dropServerHolders.add(eachHolder);
- }
- }
- }
- );
- }
- final CoordinatorStats stats = new CoordinatorStats();
+ // Find servers where we need to load the broadcast segments
+ final Set<ServerHolder> loadServerHolders =
+ params.getDruidCluster().getAllServers()
+ .stream()
+ .filter(
+ (serverHolder) -> {
+ ServerType serverType = serverHolder.getServer().getType();
+ if (!serverType.isSegmentBroadcastTarget()) {
+ return false;
+ }
+
+ final boolean isServingSegment =
+ serverHolder.isServingSegment(segment);
+
+ if (serverHolder.isDecommissioning()) {
+ if (isServingSegment && !serverHolder.isDroppingSegment(segment)) {
+ dropServerHolders.add(serverHolder);
+ }
+ return false;
+ }
+ return !isServingSegment && !serverHolder.isLoadingSegment(segment);
+ }
+ )
+ .collect(Collectors.toSet());
+
+ final CoordinatorStats stats = new CoordinatorStats();
return stats.accumulate(assign(loadServerHolders, segment))
.accumulate(drop(dropServerHolders, segment));
}
@@ -110,6 +116,4 @@ public abstract class BroadcastDistributionRule implements Rule
return stats;
}
-
- public abstract List<String> getColocatedDataSources();
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
index d095f11..ef5094c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
@@ -25,21 +25,16 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
-import java.util.List;
import java.util.Objects;
public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
{
static final String TYPE = "broadcastForever";
- private final List<String> colocatedDataSources;
-
@JsonCreator
- public ForeverBroadcastDistributionRule(
- @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
- )
+ public ForeverBroadcastDistributionRule()
{
- this.colocatedDataSources = colocatedDataSources;
+
}
@Override
@@ -50,13 +45,6 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
}
@Override
- @JsonProperty
- public List<String> getColocatedDataSources()
- {
- return colocatedDataSources;
- }
-
- @Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
@@ -79,13 +67,12 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
return false;
}
- ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o;
- return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+ return true;
}
@Override
public int hashCode()
{
- return Objects.hash(getType(), colocatedDataSources);
+ return Objects.hash(getType());
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
index c40dff7..b1bf29e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
@@ -25,23 +25,19 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
-import java.util.List;
import java.util.Objects;
public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
{
static final String TYPE = "broadcastByInterval";
private final Interval interval;
- private final List<String> colocatedDataSources;
@JsonCreator
public IntervalBroadcastDistributionRule(
- @JsonProperty("interval") Interval interval,
- @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
+ @JsonProperty("interval") Interval interval
)
{
this.interval = interval;
- this.colocatedDataSources = colocatedDataSources;
}
@Override
@@ -52,13 +48,6 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
}
@Override
- @JsonProperty
- public List<String> getColocatedDataSources()
- {
- return colocatedDataSources;
- }
-
- @Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return appliesTo(segment.getInterval(), referenceTimestamp);
@@ -79,26 +68,19 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
@Override
public boolean equals(Object o)
{
- if (o == this) {
+ if (this == o) {
return true;
}
-
- if (o == null || o.getClass() != getClass()) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
-
IntervalBroadcastDistributionRule that = (IntervalBroadcastDistributionRule) o;
-
- if (!Objects.equals(interval, that.interval)) {
- return false;
- }
-
- return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+ return Objects.equals(getInterval(), that.getInterval());
}
@Override
public int hashCode()
{
- return Objects.hash(getType(), interval, colocatedDataSources);
+ return Objects.hash(getInterval());
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
index 97c6e11..d48353d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
@@ -26,7 +26,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
-import java.util.List;
import java.util.Objects;
public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
@@ -36,18 +35,15 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
private final Period period;
private final boolean includeFuture;
- private final List<String> colocatedDataSources;
@JsonCreator
public PeriodBroadcastDistributionRule(
@JsonProperty("period") Period period,
- @JsonProperty("includeFuture") Boolean includeFuture,
- @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
+ @JsonProperty("includeFuture") Boolean includeFuture
)
{
this.period = period;
this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
- this.colocatedDataSources = colocatedDataSources;
}
@Override
@@ -58,13 +54,6 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
}
@Override
- @JsonProperty
- public List<String> getColocatedDataSources()
- {
- return colocatedDataSources;
- }
-
- @Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return appliesTo(segment.getInterval(), referenceTimestamp);
@@ -94,25 +83,17 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
if (this == o) {
return true;
}
-
- if (o == null || o.getClass() != getClass()) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
-
PeriodBroadcastDistributionRule that = (PeriodBroadcastDistributionRule) o;
-
- if (!Objects.equals(period, that.period)) {
- return false;
- }
- if (includeFuture != that.includeFuture) {
- return false;
- }
- return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+ return isIncludeFuture() == that.isIncludeFuture() &&
+ Objects.equals(getPeriod(), that.getPeriod());
}
@Override
public int hashCode()
{
- return Objects.hash(getType(), period, colocatedDataSources);
+ return Objects.hash(getPeriod(), isIncludeFuture());
}
}
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 0402975..b6d310f 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -769,7 +769,7 @@ public class DataSourcesResource
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& Iterables.any(
- segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
+ segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget
)) {
return true;
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 4715734..f5534bd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -174,7 +174,7 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
)
);
- Assert.assertFalse(
+ Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index f5455fb..6d8ef0a 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@@ -32,6 +34,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.CacheTestSegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -39,12 +42,16 @@ import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -67,6 +74,7 @@ public class SegmentLoadDropHandlerTest
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private SegmentLoadDropHandler segmentLoadDropHandler;
+
private DataSegmentAnnouncer announcer;
private File infoDir;
private AtomicInteger announceCount;
@@ -74,22 +82,36 @@ public class SegmentLoadDropHandlerTest
private CacheTestSegmentLoader segmentLoader;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
+ private SegmentLoaderConfig segmentLoaderConfig;
+ private SegmentLoaderConfig segmentLoaderConfigNoLocations;
+ private ScheduledExecutorFactory scheduledExecutorFactory;
+ private List<StorageLocationConfig> locations;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp()
{
try {
- infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
- infoDir.mkdirs();
- for (File file : infoDir.listFiles()) {
- file.delete();
- }
+ infoDir = temporaryFolder.newFolder();
log.info("Creating tmp test files in [%s]", infoDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
+ locations = Collections.singletonList(
+ new StorageLocationConfig(
+ infoDir,
+ 100L,
+ 100d
+ )
+ );
+
scheduledRunnable = new ArrayList<>();
segmentLoader = new CacheTestSegmentLoader();
@@ -132,57 +154,91 @@ public class SegmentLoadDropHandlerTest
}
};
- segmentLoadDropHandler = new SegmentLoadDropHandler(
- jsonMapper,
- new SegmentLoaderConfig()
- {
- @Override
- public File getInfoDir()
- {
- return infoDir;
- }
- @Override
- public int getNumLoadingThreads()
- {
- return 5;
- }
+ segmentLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public File getInfoDir()
+ {
+ return infoDir;
+ }
- @Override
- public int getAnnounceIntervalMillis()
- {
- return 50;
- }
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 50;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return 0;
+ }
+ };
+
+ segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
+ {
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 50;
+ }
+
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return 0;
+ }
+ };
+
+ scheduledExecutorFactory = new ScheduledExecutorFactory()
+ {
+ @Override
+ public ScheduledExecutorService create(int corePoolSize, String nameFormat)
+ {
+ /*
+ Override normal behavoir by adding the runnable to a list so that you can make sure
+ all the shceduled runnables are executed by explicitly calling run() on each item in the list
+ */
+ return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
+ {
@Override
- public int getDropSegmentDelayMillis()
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
{
- return 0;
+ scheduledRunnable.add(command);
+ return null;
}
- },
+ };
+ }
+ };
+
+ segmentLoadDropHandler = new SegmentLoadDropHandler(
+ jsonMapper,
+ segmentLoaderConfig,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
- new ScheduledExecutorFactory()
- {
- @Override
- public ScheduledExecutorService create(int corePoolSize, String nameFormat)
- {
- /*
- Override normal behavoir by adding the runnable to a list so that you can make sure
- all the shceduled runnables are executed by explicitly calling run() on each item in the list
- */
- return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
- {
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- {
- scheduledRunnable.add(command);
- return null;
- }
- };
- }
- }.create(5, "SegmentLoadDropHandlerTest-[%d]")
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+ new ServerTypeConfig(ServerType.HISTORICAL)
);
}
@@ -220,6 +276,40 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
}
+ @Test
+ public void testSegmentLoading1BrokerWithNoLocations() throws Exception
+ {
+ SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler(
+ jsonMapper,
+ segmentLoaderConfigNoLocations,
+ announcer,
+ EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
+ segmentManager,
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"),
+ new ServerTypeConfig(ServerType.BROKER)
+ );
+
+ segmentLoadDropHandlerBrokerWithNoLocations.start();
+ segmentLoadDropHandler.stop();
+ }
+
+ @Test
+ public void testSegmentLoading1HistoricalWithNoLocations()
+ {
+ expectedException.expect(IAE.class);
+ expectedException.expectMessage("Segment cache locations must be set on historicals.");
+
+ new SegmentLoadDropHandler(
+ jsonMapper,
+ segmentLoaderConfigNoLocations,
+ announcer,
+ EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
+ segmentManager,
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+ new ServerTypeConfig(ServerType.HISTORICAL)
+ );
+ }
+
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it
@@ -383,12 +473,19 @@ public class SegmentLoadDropHandlerTest
}
@Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
public int getAnnounceIntervalMillis()
{
return 50;
}
},
- announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager
+ announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL)
);
Set<DataSegment> segments = new HashSet<>();
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 9bdd84e..c30a37c 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -23,10 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -37,9 +40,15 @@ import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
@@ -47,6 +56,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class ZkCoordinatorTest extends CuratorTestBase
{
+ private static final Logger log = new Logger(ZkCoordinatorTest.class);
+
private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER;
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
@@ -67,9 +78,31 @@ public class ZkCoordinatorTest extends CuratorTestBase
};
private ZkCoordinator zkCoordinator;
+ private File infoDir;
+ private List<StorageLocationConfig> locations;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Before
public void setUp() throws Exception
{
+ try {
+ infoDir = temporaryFolder.newFolder();
+ log.info("Creating tmp test files in [%s]", infoDir);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ locations = Collections.singletonList(
+ new StorageLocationConfig(
+ infoDir,
+ 100L,
+ 100d
+ )
+ );
+
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
@@ -102,11 +135,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
ServerTestHelper.MAPPER,
- new SegmentLoaderConfig(),
+ new SegmentLoaderConfig() {
+ @Override
+ public File getInfoDir()
+ {
+ return infoDir;
+ }
+
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 50;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return 0;
+ }
+ },
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
- EasyMock.createNiceMock(ScheduledExecutorService.class)
+ EasyMock.createNiceMock(ScheduledExecutorService.class),
+ new ServerTypeConfig(ServerType.HISTORICAL)
)
{
@Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index 084a119..f37c92c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -66,9 +67,11 @@ public class BalanceSegmentsTest
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
+ private DataSegment segment5;
private List<DataSegment> segments;
private ListeningExecutorService balancerStrategyExecutor;
private BalancerStrategy balancerStrategy;
+ private Set<String> broadcastDatasources;
@Before
public void setUp()
@@ -82,6 +85,7 @@ public class BalanceSegmentsTest
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
+ segment5 = EasyMock.createMock(DataSegment.class);
DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01");
@@ -130,12 +134,24 @@ public class BalanceSegmentsTest
0,
8L
);
+ segment5 = new DataSegment(
+ "datasourceBroadcast",
+ new Interval(start2, start2.plusHours(1)),
+ version.toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ 0,
+ 8L
+ );
segments = new ArrayList<>();
segments.add(segment1);
segments.add(segment2);
segments.add(segment3);
segments.add(segment4);
+ segments.add(segment5);
peon1 = new LoadQueuePeonTester();
peon2 = new LoadQueuePeonTester();
@@ -147,6 +163,8 @@ public class BalanceSegmentsTest
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+
+ broadcastDatasources = Collections.singleton("datasourceBroadcast");
}
@After
@@ -187,10 +205,11 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
- Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+ Assert.assertEquals(3, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
/**
@@ -213,10 +232,10 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false))))
+ EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
.andReturn(new BalancerSegmentHolder(druidServer2, segment4));
- EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+ EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2));
@@ -237,6 +256,7 @@ public class BalanceSegmentsTest
.build() // ceil(3 * 0.6) = 2 segments from decommissioning servers
)
.withBalancerStrategy(strategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@@ -280,7 +300,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+ EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
@@ -303,6 +323,7 @@ public class BalanceSegmentsTest
.build()
)
.withBalancerStrategy(strategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@@ -328,7 +349,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+ EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
@@ -343,6 +364,7 @@ public class BalanceSegmentsTest
ImmutableList.of(false, true)
)
.withBalancerStrategy(strategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@@ -362,7 +384,7 @@ public class BalanceSegmentsTest
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+ EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
@@ -377,6 +399,7 @@ public class BalanceSegmentsTest
)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
.withBalancerStrategy(strategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@@ -412,6 +435,7 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
+ .withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
@@ -451,6 +475,7 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
+ .withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
2
@@ -542,6 +567,7 @@ public class BalanceSegmentsTest
)
.withUsedSegmentsInTest(segments)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+ .withBroadcastDatasources(broadcastDatasources)
.withBalancerStrategy(balancerStrategy);
}
@@ -611,7 +637,7 @@ public class BalanceSegmentsTest
}
@Override
- public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+ public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
{
return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size());
}
@@ -635,9 +661,9 @@ public class BalanceSegmentsTest
// either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true))))
+ EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment2));
- EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+ EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1));
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
@@ -656,6 +682,7 @@ public class BalanceSegmentsTest
.build()
)
.withBalancerStrategy(strategy)
+ .withBroadcastDatasources(broadcastDatasources)
.build();
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
index 772b7ae..5fb1000 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
@@ -35,6 +35,7 @@ public final class DruidClusterBuilder
private @Nullable Set<ServerHolder> realtimes = null;
private final Map<String, Iterable<ServerHolder>> historicals = new HashMap<>();
+ private @Nullable Set<ServerHolder> brokers = null;
private DruidClusterBuilder()
{
@@ -46,6 +47,12 @@ public final class DruidClusterBuilder
return this;
}
+ public DruidClusterBuilder withBrokers(ServerHolder... brokers)
+ {
+ this.brokers = new HashSet<>(Arrays.asList(brokers));
+ return this;
+ }
+
public DruidClusterBuilder addTier(String tierName, ServerHolder... historicals)
{
if (this.historicals.putIfAbsent(tierName, Arrays.asList(historicals)) != null) {
@@ -56,6 +63,6 @@ public final class DruidClusterBuilder
public DruidCluster build()
{
- return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals);
+ return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals, brokers);
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 73e829c..8aef2f2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@@ -136,6 +137,7 @@ public class ReservoirSegmentSamplerTest
@Test
public void getRandomBalancerSegmentHolderTest()
{
+ EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
@@ -143,6 +145,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
+ EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -151,6 +154,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
+ EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -159,6 +163,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
+ EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -186,7 +191,7 @@ public class ReservoirSegmentSamplerTest
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < 5000; i++) {
- segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
+ segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()).getSegment(), 1);
}
for (DataSegment segment : segments) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
index e3b51a5..0dfe0ea 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.rules;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
@@ -44,15 +43,15 @@ public class BroadcastDistributionRuleSerdeTest
public static List<Object[]> constructorFeeder()
{
return Lists.newArrayList(
- new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of("large_source1", "large_source2"))},
- new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of())},
- new Object[]{new ForeverBroadcastDistributionRule(null)},
- new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of("large_source"))},
- new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of())},
- new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), null)},
- new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of("large_source"))},
- new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of())},
- new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, null)}
+ new Object[]{new ForeverBroadcastDistributionRule()},
+ new Object[]{new ForeverBroadcastDistributionRule()},
+ new Object[]{new ForeverBroadcastDistributionRule()},
+ new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+ new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+ new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+ new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
+ new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
+ new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}
);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index 70ec3eb..c2d4fd3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.rules;
-import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -269,7 +268,7 @@ public class BroadcastDistributionRuleTest
public void testBroadcastToSingleDataSource()
{
final ForeverBroadcastDistributionRule rule =
- new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
+ new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@@ -285,7 +284,7 @@ public class BroadcastDistributionRuleTest
smallSegment
);
- Assert.assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
+ Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertFalse(stats.hasPerTierStats());
Assert.assertTrue(
@@ -295,10 +294,10 @@ public class BroadcastDistributionRuleTest
Assert.assertTrue(
holdersOfLargeSegments2.stream()
- .noneMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
+ .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
);
- Assert.assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment));
+ Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
}
private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
@@ -331,7 +330,7 @@ public class BroadcastDistributionRuleTest
public void testBroadcastDecommissioning()
{
final ForeverBroadcastDistributionRule rule =
- new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
+ new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@@ -356,7 +355,6 @@ public class BroadcastDistributionRuleTest
public void testBroadcastToMultipleDataSources()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(
- ImmutableList.of("large_source", "large_source2")
);
CoordinatorStats stats = rule.run(
@@ -392,7 +390,7 @@ public class BroadcastDistributionRuleTest
@Test
public void testBroadcastToAllServers()
{
- final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(null);
+ final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@@ -408,14 +406,14 @@ public class BroadcastDistributionRuleTest
smallSegment
);
- Assert.assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
+ Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertFalse(stats.hasPerTierStats());
Assert.assertTrue(
druidCluster
.getAllServers()
.stream()
- .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
+ .allMatch(holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment))
);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 5badcb0..18d2813 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -33,6 +33,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
+import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
@@ -42,9 +43,11 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
@@ -52,7 +55,12 @@ import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.ClientQuerySegmentWalker;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.BrokerResource;
+import org.apache.druid.server.http.HistoricalResource;
+import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
@@ -123,12 +131,19 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, HttpServerInventoryViewResource.class);
LifecycleModule.register(binder, Server.class);
+ binder.bind(SegmentManager.class).in(LazySingleton.class);
+ binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+ binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER));
+ Jerseys.addResource(binder, HistoricalResource.class);
+ Jerseys.addResource(binder, SegmentListerResource.class);
+
+ LifecycleModule.register(binder, ZkCoordinator.class);
bindNodeRoleAndAnnouncer(
binder,
DiscoverySideEffectsProvider
.builder(NodeRole.BROKER)
- .serviceClasses(ImmutableList.of(LookupNodeService.class))
+ .serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
.useLegacyAnnouncer(true)
.build()
);
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 7483fec..d4a8df1 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -25,9 +25,9 @@ import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
-import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
@@ -43,6 +43,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
@@ -60,12 +61,13 @@ import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
-import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
+import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@@ -138,14 +140,14 @@ public class CliIndexer extends ServerRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
-
- LifecycleModule.register(binder, CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
-
Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
- binder.bind(SegmentLoadDropHandler.class).toProvider(Providers.of(null));
+ binder.bind(SegmentManager.class).in(LazySingleton.class);
+ binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+ Jerseys.addResource(binder, HistoricalResource.class);
+ LifecycleModule.register(binder, ZkCoordinator.class);
bindNodeRoleAndAnnouncer(
binder,
@@ -186,11 +188,11 @@ public class CliIndexer extends ServerRunnable
@Provides
@LazySingleton
- public DataNodeService getDataNodeService()
+ public DataNodeService getDataNodeService(DruidServerConfig serverConfig)
{
return new DataNodeService(
DruidServer.DEFAULT_TIER,
- 0L,
+ serverConfig.getMaxSize(),
ServerType.INDEXER_EXECUTOR,
DruidServer.DEFAULT_PRIORITY
);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1160eb9..59ef48a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -61,7 +61,6 @@ import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
@@ -109,15 +108,16 @@ import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffN
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
+import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
+import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.eclipse.jetty.server.Server;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -154,6 +154,14 @@ public class CliPeon extends GuiceRunnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String serverType = "indexer-executor";
+
+ /**
+ * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
+ * queryable tasks, such as streaming ingestion tasks.
+ */
+ @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments")
+ public String loadBroadcastSegments = "false";
+
private static final Logger log = new Logger(CliPeon.class);
@Inject
@@ -174,6 +182,7 @@ public class CliPeon extends GuiceRunnable
new JoinableFactoryModule(),
new Module()
{
+ @SuppressForbidden(reason = "System#out, System#err")
@Override
public void configure(Binder binder)
{
@@ -218,6 +227,13 @@ public class CliPeon extends GuiceRunnable
Jerseys.addResource(binder, SegmentListerResource.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
LifecycleModule.register(binder, Server.class);
+
+ if ("true".equals(loadBroadcastSegments)) {
+ binder.bind(SegmentManager.class).in(LazySingleton.class);
+ binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+ Jerseys.addResource(binder, HistoricalResource.class);
+ LifecycleModule.register(binder, ZkCoordinator.class);
+ }
}
@Provides
@@ -247,16 +263,6 @@ public class CliPeon extends GuiceRunnable
{
return task.getId();
}
-
- @Provides
- public SegmentListerResource getSegmentListerResource(
- @Json ObjectMapper jsonMapper,
- @Smile ObjectMapper smileMapper,
- @Nullable BatchDataSegmentAnnouncer announcer
- )
- {
- return new SegmentListerResource(jsonMapper, smileMapper, announcer, null);
- }
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),
diff --git a/services/src/test/java/org/apache/druid/cli/MainTest.java b/services/src/test/java/org/apache/druid/cli/MainTest.java
index 3e960f6..a1fbed5 100644
--- a/services/src/test/java/org/apache/druid/cli/MainTest.java
+++ b/services/src/test/java/org/apache/druid/cli/MainTest.java
@@ -50,7 +50,9 @@ public class MainTest
//new Object[]{new CliInternalHadoopIndexer()},
new Object[]{new CliMiddleManager()},
- new Object[]{new CliRouter()}
+ new Object[]{new CliRouter()},
+
+ new Object[]{new CliIndexer()}
);
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 9f087ea..54e7e5a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -355,7 +355,7 @@ public class DruidSchema extends AbstractSchema
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) {
// segmentReplicatable is used to determine if segments are served by historical or realtime servers
- long isRealtime = server.segmentReplicatable() ? 0 : 1;
+ long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata.builder(
segment,
isRealtime,
@@ -366,7 +366,7 @@ public class DruidSchema extends AbstractSchema
// Unknown segment.
setAvailableSegmentMetadata(segment.getId(), segmentMetadata);
segmentsNeedingRefresh.add(segment.getId());
- if (!server.segmentReplicatable()) {
+ if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment[%s].", segment.getId());
mutableSegments.add(segment.getId());
} else {
@@ -384,7 +384,7 @@ public class DruidSchema extends AbstractSchema
.withRealtime(recomputeIsRealtime(servers))
.build();
knownSegments.put(segment.getId(), metadataWithNumReplicas);
- if (server.segmentReplicatable()) {
+ if (server.isSegmentReplicationTarget()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment.getId());
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index e3579aa..0832858 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -11887,7 +11887,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
- public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map<String, Object> queryContext) throws Exception
+ public void testNestedGroupByOnInlineDataSourceWithFilter(Map<String, Object> queryContext) throws Exception
{
try {
testQuery(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org