You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/06/09 03:16:17 UTC

[druid] branch master updated: Load broadcast datasources on broker and tasks (#9971)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 771870a  Load broadcast datasources on broker and tasks (#9971)
771870a is described below

commit 771870ae2d312d643e6d98f3d0af8a9618af9681
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Mon Jun 8 20:15:59 2020 -0700

    Load broadcast datasources on broker and tasks (#9971)
    
    * Load broadcast datasources on broker and tasks
    
    * Add javadocs
    
    * Support HTTP segment management
    
    * Fix indexer maxSize
    
    * inspection fix
    
    * Make segment cache optional on non-historicals
    
    * Fix build
    
    * Fix inspections, some coverage, failed tests
    
    * More tests
    
    * Add CliIndexer to MainTest
    
    * Fix inspection
    
    * Rename UnprunedDataSegment to LoadableDataSegment
    
    * Address PR comments
    
    * Fix
---
 .../druid/indexing/kafka/KafkaIndexTask.java       |   6 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   1 +
 .../druid/indexing/kinesis/KinesisIndexTask.java   |   6 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   2 +-
 .../druid/indexing/common/task/AbstractTask.java   |   6 +
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   6 +
 .../indexing/common/task/RealtimeIndexTask.java    |   6 +
 .../apache/druid/indexing/common/task/Task.java    |   5 +
 .../druid/indexing/overlord/ForkingTaskRunner.java |   7 +
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   1 +
 .../druid/indexing/common/task/IndexTaskTest.java  |   2 +
 .../common/task/RealtimeIndexTaskTest.java         |   6 +
 server/pom.xml                                     |   4 -
 .../druid/client/CachingClusteredClient.java       |   4 +-
 .../java/org/apache/druid/client/DruidServer.java  |  14 +-
 .../discovery/DruidNodeDiscoveryProvider.java      |   3 +-
 .../druid/segment/loading/SegmentLoaderConfig.java |   5 +-
 .../loading/SegmentLoaderLocalCacheManager.java    |   1 -
 ...DataSegmentServerAnnouncerLifecycleHandler.java | 104 -----------
 .../CoordinatorBasedSegmentHandoffNotifier.java    |   2 +-
 .../server/coordination/DruidServerMetadata.java   |  12 +-
 .../server/coordination/LoadableDataSegment.java   |  81 +++++++++
 .../coordination/SegmentChangeRequestLoad.java     |  14 +-
 .../coordination/SegmentLoadDropHandler.java       |  28 ++-
 .../druid/server/coordination/ServerType.java      |   8 +
 .../druid/server/coordinator/BalancerStrategy.java |   9 +-
 .../CachingCostBalancerStrategyFactory.java        |   6 +-
 .../server/coordinator/CostBalancerStrategy.java   |   8 +-
 .../druid/server/coordinator/DruidCluster.java     |  45 ++++-
 .../druid/server/coordinator/DruidCoordinator.java |   2 +-
 .../coordinator/DruidCoordinatorRuntimeParams.java |  23 ++-
 .../server/coordinator/RandomBalancerStrategy.java |   5 +-
 .../coordinator/ReservoirSegmentSampler.java       |  16 +-
 .../druid/server/coordinator/ServerHolder.java     |   5 +
 .../server/coordinator/duty/BalanceSegments.java   |   5 +-
 .../druid/server/coordinator/duty/RunRules.java    |  14 +-
 .../rules/BroadcastDistributionRule.java           |  52 +++---
 .../rules/ForeverBroadcastDistributionRule.java    |  21 +--
 .../rules/IntervalBroadcastDistributionRule.java   |  28 +--
 .../rules/PeriodBroadcastDistributionRule.java     |  29 +---
 .../druid/server/http/DataSourcesResource.java     |   2 +-
 ...CoordinatorBasedSegmentHandoffNotifierTest.java |   2 +-
 .../coordination/SegmentLoadDropHandlerTest.java   | 193 ++++++++++++++++-----
 .../server/coordination/ZkCoordinatorTest.java     |  68 +++++++-
 .../server/coordinator/BalanceSegmentsTest.java    |  45 ++++-
 .../server/coordinator/DruidClusterBuilder.java    |   9 +-
 .../coordinator/ReservoirSegmentSamplerTest.java   |   7 +-
 .../rules/BroadcastDistributionRuleSerdeTest.java  |  19 +-
 .../rules/BroadcastDistributionRuleTest.java       |  18 +-
 .../main/java/org/apache/druid/cli/CliBroker.java  |  17 +-
 .../main/java/org/apache/druid/cli/CliIndexer.java |  20 ++-
 .../main/java/org/apache/druid/cli/CliPeon.java    |  32 ++--
 .../test/java/org/apache/druid/cli/MainTest.java   |   4 +-
 .../druid/sql/calcite/schema/DruidSchema.java      |   6 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |   2 +-
 55 files changed, 699 insertions(+), 347 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 9f7f997..6bf4086 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -191,4 +191,10 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
   {
     return TYPE;
   }
+
+  @Override
+  public boolean supportsQueries()
+  {
+    return true;
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 46a6a04..eb41749 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -345,6 +345,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             INPUT_FORMAT
         )
     );
+    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index bfd3758..9eee3bf 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -137,6 +137,12 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
     return TYPE;
   }
 
+  @Override
+  public boolean supportsQueries()
+  {
+    return true;
+  }
+
   @VisibleForTesting
   AWSCredentialsConfig getAwsCredentialsConfig()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 142f81f..48ca434 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -347,8 +347,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
             null,
             false
         )
-
     );
+    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 40745bf..728f3de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -146,6 +146,12 @@ public abstract class AbstractTask implements Task
   }
 
   @Override
+  public boolean supportsQueries()
+  {
+    return false;
+  }
+
+  @Override
   public String getClasspathPrefix()
   {
     return null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c96b7f5..9a786fa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -246,6 +246,12 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
   }
 
   @Override
+  public boolean supportsQueries()
+  {
+    return true;
+  }
+
+  @Override
   public boolean isReady(TaskActionClient taskActionClient)
   {
     return true;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index ed2ddd2..055a3fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -197,6 +197,12 @@ public class RealtimeIndexTask extends AbstractTask
   }
 
   @Override
+  public boolean supportsQueries()
+  {
+    return true;
+  }
+
+  @Override
   public boolean isReady(TaskActionClient taskActionClient)
   {
     return true;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index c069c73..4f18c81 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -147,6 +147,11 @@ public interface Task
   <T> QueryRunner<T> getQueryRunner(Query<T> query);
 
   /**
+   * @return true if this Task type is queryable, such as streaming ingestion tasks
+   */
+  boolean supportsQueries();
+
+  /**
    * Returns an extra classpath that should be prepended to the default classpath when running this task. If no
    * extra classpath should be prepended, this should return null or the empty string.
    */
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 2f1abc1..10f5e6b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -327,6 +327,13 @@ public class ForkingTaskRunner
                           command.add(nodeType);
                         }
 
+                        // If the task type is queryable, we need to load broadcast segments on the peon, used for
+                        // join queries
+                        if (task.supportsQueries()) {
+                          command.add("--loadBroadcastSegments");
+                          command.add("true");
+                        }
+
                         if (!taskFile.exists()) {
                           jsonMapper.writeValue(taskFile, task);
                         }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index de3fa29..513a590 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -337,6 +337,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
   {
     expectPublishedSegments(1);
     final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null);
+    Assert.assertTrue(task.supportsQueries());
     final ListenableFuture<TaskStatus> statusFuture = runTask(task);
 
     // Wait for firehose to show up, it starts off null.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 8668a93..6e3fcf5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -229,6 +229,8 @@ public class IndexTaskTest extends IngestionTestBase
         appenderatorsManager
     );
 
+    Assert.assertFalse(indexTask.supportsQueries());
+
     final List<DataSegment> segments = runTask(indexTask).rhs;
 
     Assert.assertEquals(2, segments.size());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 8cc21d0..12ea214 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -189,6 +189,12 @@ public class RealtimeIndexTaskTest
     Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
   }
 
+  @Test(timeout = 60_000L)
+  public void testSupportsQueries()
+  {
+    final RealtimeIndexTask task = makeRealtimeTask(null);
+    Assert.assertTrue(task.supportsQueries());
+  }
 
   @Test(timeout = 60_000L, expected = ExecutionException.class)
   public void testHandoffTimeout() throws Exception
diff --git a/server/pom.xml b/server/pom.xml
index 5d68a91..ddce092 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -292,10 +292,6 @@
             <artifactId>validation-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.hibernate</groupId>
-            <artifactId>hibernate-validator</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.google.errorprone</groupId>
             <artifactId>error_prone_annotations</artifactId>
         </dependency>
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index ae4dc16..c382be3 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -452,7 +452,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
       Hasher hasher = Hashing.sha1().newHasher();
       boolean hasOnlyHistoricalSegments = true;
       for (SegmentServerSelector p : segments) {
-        if (!p.getServer().pick().getServer().segmentReplicatable()) {
+        if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
           hasOnlyHistoricalSegments = false;
           break;
         }
@@ -633,7 +633,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
 
         if (isBySegment) {
           serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
-        } else if (!server.segmentReplicatable() || !populateCache) {
+        } else if (!server.isSegmentReplicationTarget() || !populateCache) {
           serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
         } else {
           serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index ddcba54..6c52866 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -137,9 +137,19 @@ public class DruidServer implements Comparable<DruidServer>
     return metadata.getTier();
   }
 
-  public boolean segmentReplicatable()
+  public boolean isSegmentReplicationTarget()
   {
-    return metadata.segmentReplicatable();
+    return metadata.isSegmentReplicationTarget();
+  }
+
+  public boolean isSegmentBroadcastTarget()
+  {
+    return metadata.isSegmentBroadcastTarget();
+  }
+
+  public boolean isSegmentReplicationOrBroadcastTarget()
+  {
+    return metadata.isSegmentReplicationTarget() || metadata.isSegmentBroadcastTarget();
   }
 
   @JsonProperty
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 7332029..898ce7c 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -44,7 +44,8 @@ public abstract class DruidNodeDiscoveryProvider
   private static final Map<String, Set<NodeRole>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
       LookupNodeService.DISCOVERY_SERVICE_KEY,
       ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
-      DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
+      DataNodeService.DISCOVERY_SERVICE_KEY,
+      ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER, NodeRole.BROKER),
       WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
   );
 
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index c6c5723..39b3bde 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -23,9 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.druid.utils.JvmUtils;
-import org.hibernate.validator.constraints.NotEmpty;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -34,8 +34,7 @@ import java.util.concurrent.TimeUnit;
 public class SegmentLoaderConfig
 {
   @JsonProperty
-  @NotEmpty
-  private List<StorageLocationConfig> locations = null;
+  private List<StorageLocationConfig> locations = Collections.emptyList();
 
   @JsonProperty("lazyLoadOnStart")
   private boolean lazyLoadOnStart = false;
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 398ad67..b2ac7e8 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -89,7 +89,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
     this.indexIO = indexIO;
     this.config = config;
     this.jsonMapper = mapper;
-
     this.locations = new ArrayList<>();
     for (StorageLocationConfig locationConfig : config.getLocations()) {
       locations.add(
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java b/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
deleted file mode 100644
index e874a30..0000000
--- a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import com.google.common.base.Throwables;
-import com.google.inject.Inject;
-import org.apache.druid.concurrent.LifecycleLock;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
-
-import java.io.IOException;
-
-/**
- * Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the lifecycle start and stop.
- *
- * Analogous to {@link org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals,
- * but without segment cache management.
- */
-@ManageLifecycle
-public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler
-{
-  private static final EmittingLogger LOG = new EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
-
-  private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer;
-
-  private final LifecycleLock lifecycleLock = new LifecycleLock();
-
-  @Inject
-  public CliIndexerDataSegmentServerAnnouncerLifecycleHandler(
-      DataSegmentServerAnnouncer dataSegmentServerAnnouncer
-  )
-  {
-    this.dataSegmentServerAnnouncer = dataSegmentServerAnnouncer;
-  }
-
-  @LifecycleStart
-  public void start() throws IOException
-  {
-    if (!lifecycleLock.canStart()) {
-      throw new RuntimeException("Lifecycle lock could not start");
-    }
-
-    try {
-      if (lifecycleLock.isStarted()) {
-        return;
-      }
-
-      LOG.info("Starting...");
-      try {
-        dataSegmentServerAnnouncer.announce();
-      }
-      catch (Exception e) {
-        Throwables.propagateIfPossible(e, IOException.class);
-        throw new RuntimeException(e);
-      }
-      LOG.info("Started.");
-      lifecycleLock.started();
-    }
-    finally {
-      lifecycleLock.exitStart();
-    }
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-    if (!lifecycleLock.canStop()) {
-      throw new RuntimeException("Lifecycle lock could not stop");
-    }
-
-    if (!lifecycleLock.isStarted()) {
-      return;
-    }
-
-    LOG.info("Stopping...");
-    try {
-      dataSegmentServerAnnouncer.unannounce();
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    LOG.info("Stopped.");
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index bbee720..2e97258 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -142,7 +142,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
           && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
              == descriptor.getPartitionNumber()
           && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
-          && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) {
+          && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) {
         return true;
       }
     }
diff --git a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
index e3673bb..3fda41b 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java
@@ -107,11 +107,21 @@ public class DruidServerMetadata
     return priority;
   }
 
-  public boolean segmentReplicatable()
+  public boolean isSegmentReplicationTarget()
   {
     return type.isSegmentReplicationTarget();
   }
 
+  public boolean isSegmentBroadcastTarget()
+  {
+    return type.isSegmentBroadcastTarget();
+  }
+
+  public boolean isSegmentReplicationOrBroadcastTarget()
+  {
+    return isSegmentReplicationTarget() || isSegmentBroadcastTarget();
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
new file mode 100644
index 0000000..4f4f7a5
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.jackson.CommaListJoinDeserializer;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A deserialization aid used by {@link SegmentChangeRequestLoad}. The broker prunes the loadSpec from segments
+ * for efficiency reasons, but the broker does need the loadSpec when it loads broadcast segments.
+ *
+ * This class always uses the non-pruning default {@link PruneSpecsHolder}.
+ */
+public class LoadableDataSegment extends DataSegment
+{
+  @JsonCreator
+  public LoadableDataSegment(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("version") String version,
+      // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution
+      @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
+      @JsonProperty("dimensions")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+      List<String> dimensions,
+      @JsonProperty("metrics")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+      List<String> metrics,
+      @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
+      @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
+      @JsonProperty("binaryVersion") Integer binaryVersion,
+      @JsonProperty("size") long size,
+      @JacksonInject PruneSpecsHolder pruneSpecsHolder
+  )
+  {
+    super(
+        dataSource,
+        interval,
+        version,
+        loadSpec,
+        dimensions,
+        metrics,
+        shardSpec,
+        lastCompactionState,
+        binaryVersion,
+        size,
+        PruneSpecsHolder.DEFAULT
+    );
+
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 097e025..130c7b5 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -35,14 +35,26 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
 {
   private final DataSegment segment;
 
+  /**
+   * To avoid pruning of the loadSpec on the broker, needed when the broker is loading broadcast segments,
+   * we deserialize into an {@link LoadableDataSegment}, which never removes the loadSpec.
+   */
   @JsonCreator
   public SegmentChangeRequestLoad(
-      @JsonUnwrapped DataSegment segment
+      @JsonUnwrapped LoadableDataSegment segment
   )
   {
     this.segment = segment;
   }
 
+  public SegmentChangeRequestLoad(
+      DataSegment segment
+  )
+  {
+    this.segment = segment;
+  }
+
+
   @Override
   public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback)
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 51aefc1..87a1936 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -34,6 +34,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -104,7 +106,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
-      SegmentManager segmentManager
+      SegmentManager segmentManager,
+      ServerTypeConfig serverTypeConfig
   )
   {
     this(
@@ -116,7 +119,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
         Executors.newScheduledThreadPool(
             config.getNumLoadingThreads(),
             Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
-        )
+        ),
+        serverTypeConfig
     );
   }
 
@@ -127,7 +131,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
-      ScheduledExecutorService exec
+      ScheduledExecutorService exec,
+      ServerTypeConfig serverTypeConfig
   )
   {
     this.jsonMapper = jsonMapper;
@@ -139,6 +144,13 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
     this.exec = exec;
     this.segmentsToDelete = new ConcurrentSkipListSet<>();
 
+    if (config.getLocations().isEmpty()) {
+      if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) {
+        throw new IAE("Segment cache locations must be set on historicals.");
+      } else {
+        log.info("Not starting SegmentLoadDropHandler with empty segment cache locations.");
+      }
+    }
     requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
   }
 
@@ -152,8 +164,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
 
       log.info("Starting...");
       try {
-        loadLocalCache();
-        serverAnnouncer.announce();
+        if (!config.getLocations().isEmpty()) {
+          loadLocalCache();
+          serverAnnouncer.announce();
+        }
       }
       catch (Exception e) {
         Throwables.propagateIfPossible(e, IOException.class);
@@ -174,7 +188,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
 
       log.info("Stopping...");
       try {
-        serverAnnouncer.unannounce();
+        if (!config.getLocations().isEmpty()) {
+          serverAnnouncer.unannounce();
+        }
       }
       catch (Exception e) {
         throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
index 42fb65a..0b860a1 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
@@ -63,6 +63,14 @@ public enum ServerType
     {
       return false;
     }
+  },
+
+  BROKER {
+    @Override
+    public boolean isSegmentReplicationTarget()
+    {
+      return false;
+    }
   };
 
   /**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index d9fea81..889c167 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 
 /**
  * This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
@@ -56,11 +57,17 @@ public interface BalancerStrategy
   /**
    * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
    * @param serverHolders set of historicals to consider for moving segments
+   * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
+   *                             Balancing strategies should avoid rebalancing segments for such datasources, since
+   *                             they should be loaded on all servers anyway.
+   *                             NOTE: this should really be handled on a per-segment basis, to properly support
+   *                                   the interval or period-based broadcast rules. For simplicity of the initial
+   *                                   implementation, only forever broadcast rules are supported.
    * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
    *         there are no segments to pick from (i. e. all provided serverHolders are empty).
    */
   @Nullable
-  BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);
+  BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources);
 
   /**
    * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
index 4a1989d..1741087 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java
@@ -71,7 +71,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
           @Override
           public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
           {
-            if (server.segmentReplicatable()) {
+            if (server.isSegmentReplicationTarget()) {
               clusterCostCacheBuilder.addSegment(server.getName(), segment);
             }
             return ServerView.CallbackAction.CONTINUE;
@@ -80,7 +80,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
           @Override
           public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
           {
-            if (server.segmentReplicatable()) {
+            if (server.isSegmentReplicationTarget()) {
               clusterCostCacheBuilder.removeSegment(server.getName(), segment);
             }
             return ServerView.CallbackAction.CONTINUE;
@@ -98,7 +98,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
     serverInventoryView.registerServerRemovedCallback(
         executor,
         server -> {
-          if (server.segmentReplicatable()) {
+          if (server.isSegmentReplicationTarget()) {
             clusterCostCacheBuilder.removeServer(server.getName());
           }
           return ServerView.CallbackAction.CONTINUE;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 4fd4164..5d656d6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -35,6 +35,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -211,9 +212,12 @@ public class CostBalancerStrategy implements BalancerStrategy
 
 
   @Override
-  public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
+  public BalancerSegmentHolder pickSegmentToMove(
+      final List<ServerHolder> serverHolders,
+      Set<String> broadcastDatasources
+  )
   {
-    return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
+    return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 318f663..8fb4ccb 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -47,24 +47,28 @@ public class DruidCluster
   @VisibleForTesting
   static DruidCluster createDruidClusterFromBuilderInTest(
       @Nullable Set<ServerHolder> realtimes,
-      Map<String, Iterable<ServerHolder>> historicals
+      Map<String, Iterable<ServerHolder>> historicals,
+      @Nullable Set<ServerHolder> brokers
   )
   {
-    return new DruidCluster(realtimes, historicals);
+    return new DruidCluster(realtimes, historicals, brokers);
   }
 
   private final Set<ServerHolder> realtimes;
   private final Map<String, NavigableSet<ServerHolder>> historicals;
+  private final Set<ServerHolder> brokers;
 
   public DruidCluster()
   {
     this.realtimes = new HashSet<>();
     this.historicals = new HashMap<>();
+    this.brokers = new HashSet<>();
   }
 
   private DruidCluster(
       @Nullable Set<ServerHolder> realtimes,
-      Map<String, Iterable<ServerHolder>> historicals
+      Map<String, Iterable<ServerHolder>> historicals,
+      @Nullable Set<ServerHolder> brokers
   )
   {
     this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
@@ -72,6 +76,7 @@ public class DruidCluster
         historicals,
         holders -> CollectionUtils.newTreeSet(Comparator.reverseOrder(), holders)
     );
+    this.brokers = brokers == null ? new HashSet<>() : new HashSet<>(brokers);
   }
 
   public void add(ServerHolder serverHolder)
@@ -87,7 +92,11 @@ public class DruidCluster
         addHistorical(serverHolder);
         break;
       case INDEXER_EXECUTOR:
-        throw new IAE("unsupported server type[%s]", serverHolder.getServer().getType());
+        addRealtime(serverHolder);
+        break;
+      case BROKER:
+        addBroker(serverHolder);
+        break;
       default:
         throw new IAE("unknown server type[%s]", serverHolder.getServer().getType());
     }
@@ -108,6 +117,11 @@ public class DruidCluster
     tierServers.add(serverHolder);
   }
 
+  private void addBroker(ServerHolder serverHolder)
+  {
+    brokers.add(serverHolder);
+  }
+
   public Set<ServerHolder> getRealtimes()
   {
     return realtimes;
@@ -118,6 +132,12 @@ public class DruidCluster
     return historicals;
   }
 
+
+  public Set<ServerHolder> getBrokers()
+  {
+    return brokers;
+  }
+
   public Iterable<String> getTierNames()
   {
     return historicals.keySet();
@@ -135,6 +155,7 @@ public class DruidCluster
     final List<ServerHolder> allServers = new ArrayList<>(historicalSize + realtimeSize);
 
     historicals.values().forEach(allServers::addAll);
+    allServers.addAll(brokers);
     allServers.addAll(realtimes);
     return allServers;
   }
@@ -146,7 +167,7 @@ public class DruidCluster
 
   public boolean isEmpty()
   {
-    return historicals.isEmpty() && realtimes.isEmpty();
+    return historicals.isEmpty() && realtimes.isEmpty() && brokers.isEmpty();
   }
 
   public boolean hasHistoricals()
@@ -159,9 +180,19 @@ public class DruidCluster
     return !realtimes.isEmpty();
   }
 
+  public boolean hasBrokers()
+  {
+    return !brokers.isEmpty();
+  }
+
   public boolean hasTier(String tier)
   {
-    NavigableSet<ServerHolder> servers = historicals.get(tier);
-    return (servers != null) && !servers.isEmpty();
+    NavigableSet<ServerHolder> historicalServers = historicals.get(tier);
+    boolean historicalsHasTier = (historicalServers != null) && !historicalServers.isEmpty();
+    if (historicalsHasTier) {
+      return true;
+    }
+
+    return false;
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index f8c3f43..36a414e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -761,7 +761,7 @@ public class DruidCoordinator
       List<ImmutableDruidServer> currentServers = serverInventoryView
           .getInventory()
           .stream()
-          .filter(DruidServer::segmentReplicatable)
+          .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
           .map(DruidServer::toImmutableDruidServer)
           .collect(Collectors.toList());
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 7bae11e..3337b8e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -34,7 +34,9 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
@@ -70,6 +72,7 @@ public class DruidCoordinatorRuntimeParams
   private final CoordinatorStats stats;
   private final DateTime balancerReferenceTimestamp;
   private final BalancerStrategy balancerStrategy;
+  private final Set<String> broadcastDatasources;
 
   private DruidCoordinatorRuntimeParams(
       long startTimeNanos,
@@ -85,7 +88,8 @@ public class DruidCoordinatorRuntimeParams
       CoordinatorCompactionConfig coordinatorCompactionConfig,
       CoordinatorStats stats,
       DateTime balancerReferenceTimestamp,
-      BalancerStrategy balancerStrategy
+      BalancerStrategy balancerStrategy,
+      Set<String> broadcastDatasources
   )
   {
     this.startTimeNanos = startTimeNanos;
@@ -102,6 +106,7 @@ public class DruidCoordinatorRuntimeParams
     this.stats = stats;
     this.balancerReferenceTimestamp = balancerReferenceTimestamp;
     this.balancerStrategy = balancerStrategy;
+    this.broadcastDatasources = broadcastDatasources;
   }
 
   public long getStartTimeNanos()
@@ -180,6 +185,11 @@ public class DruidCoordinatorRuntimeParams
     return balancerStrategy;
   }
 
+  public Set<String> getBroadcastDatasources()
+  {
+    return broadcastDatasources;
+  }
+
   public boolean coordinatorIsLeadingEnoughTimeToMarkAsUnusedOvershadowedSegements()
   {
     long nanosElapsedSinceCoordinatorStart = System.nanoTime() - getStartTimeNanos();
@@ -256,6 +266,7 @@ public class DruidCoordinatorRuntimeParams
     private CoordinatorStats stats;
     private DateTime balancerReferenceTimestamp;
     private BalancerStrategy balancerStrategy;
+    private Set<String> broadcastDatasources;
 
     private Builder()
     {
@@ -272,6 +283,7 @@ public class DruidCoordinatorRuntimeParams
       this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
       this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
       this.balancerReferenceTimestamp = DateTimes.nowUtc();
+      this.broadcastDatasources = new HashSet<>();
     }
 
     Builder(
@@ -324,7 +336,8 @@ public class DruidCoordinatorRuntimeParams
           coordinatorCompactionConfig,
           stats,
           balancerReferenceTimestamp,
-          balancerStrategy
+          balancerStrategy,
+          broadcastDatasources
       );
     }
 
@@ -436,5 +449,11 @@ public class DruidCoordinatorRuntimeParams
       this.balancerStrategy = balancerStrategy;
       return this;
     }
+
+    public Builder withBroadcastDatasources(Set<String> broadcastDatasources)
+    {
+      this.broadcastDatasources = broadcastDatasources;
+      return this;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
index 8b0b306..72fdedf 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class RandomBalancerStrategy implements BalancerStrategy
@@ -51,9 +52,9 @@ public class RandomBalancerStrategy implements BalancerStrategy
   }
 
   @Override
-  public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+  public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
   {
-    return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
+    return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
index c2c4a7a..7181d52 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
@@ -22,19 +22,33 @@ package org.apache.druid.server.coordinator;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 
 final class ReservoirSegmentSampler
 {
 
-  static BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
+  static BalancerSegmentHolder getRandomBalancerSegmentHolder(
+      final List<ServerHolder> serverHolders,
+      Set<String> broadcastDatasources
+  )
   {
     ServerHolder fromServerHolder = null;
     DataSegment proposalSegment = null;
     int numSoFar = 0;
 
     for (ServerHolder server : serverHolders) {
+      if (!server.getServer().getType().isSegmentReplicationTarget()) {
+        // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
+        continue;
+      }
+
       for (DataSegment segment : server.getServer().iterateAllSegments()) {
+        if (broadcastDatasources.contains(segment.getDataSource())) {
+          // we don't need to rebalance segments that were assigned via broadcast rules
+          continue;
+        }
+
         int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
         // w.p. 1 / (numSoFar+1), swap out the server and segment
         if (randNum == numSoFar) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index ba96566..26fa9a5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -122,6 +122,11 @@ public class ServerHolder implements Comparable<ServerHolder>
     return peon.getSegmentsToLoad().contains(segment);
   }
 
+  public boolean isDroppingSegment(DataSegment segment)
+  {
+    return peon.getSegmentsToDrop().contains(segment);
+  }
+
   public int getNumberOfSegmentsInQueue()
   {
     return peon.getNumberOfSegmentsInQueue();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index d42ca63..a1c5237 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -187,7 +187,10 @@ public class BalanceSegments implements CoordinatorDuty
 
     //noinspection ForLoopThatDoesntUseLoopVariable
     for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
-      final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
+      final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(
+          toMoveFrom,
+          params.getBroadcastDatasources()
+      );
       if (segmentToMoveHolder == null) {
         log.info("All servers to move segments from are empty, ending run.");
         break;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
index 5288bb3..3dc7b4d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
@@ -28,11 +28,13 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -101,6 +103,7 @@ public class RunRules implements CoordinatorDuty
 
     final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
     int missingRules = 0;
+    final Set<String> broadcastDatasources = new HashSet<>();
     for (DataSegment segment : params.getUsedSegments()) {
       if (overshadowed.contains(segment.getId())) {
         // Skipping overshadowed segments
@@ -112,6 +115,12 @@ public class RunRules implements CoordinatorDuty
         if (rule.appliesTo(segment, now)) {
           stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
           foundMatchingRule = true;
+
+          // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
+          // executes before BalanceSegments
+          if (rule instanceof BroadcastDistributionRule) {
+            broadcastDatasources.add(segment.getDataSource());
+          }
           break;
         }
       }
@@ -131,6 +140,9 @@ public class RunRules implements CoordinatorDuty
          .emit();
     }
 
-    return params.buildFromExisting().withCoordinatorStats(stats).build();
+    return params.buildFromExisting()
+                 .withCoordinatorStats(stats)
+                 .withBroadcastDatasources(broadcastDatasources)
+                 .build();
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
index 6581712..35ff39e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator.rules;
 
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -27,8 +28,8 @@ import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public abstract class BroadcastDistributionRule implements Rule
 {
@@ -37,30 +38,35 @@ public abstract class BroadcastDistributionRule implements Rule
   @Override
   public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
   {
-    // Find servers which holds the segments of co-located data source
-    final Set<ServerHolder> loadServerHolders = new HashSet<>();
     final Set<ServerHolder> dropServerHolders = new HashSet<>();
-    final List<String> colocatedDataSources = getColocatedDataSources();
-    if (colocatedDataSources == null || colocatedDataSources.isEmpty()) {
-      loadServerHolders.addAll(params.getDruidCluster().getAllServers());
-    } else {
-      params.getDruidCluster().getAllServers().forEach(
-          eachHolder -> {
-            if (!eachHolder.isDecommissioning()
-                && colocatedDataSources.stream()
-                                       .anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
-              loadServerHolders.add(eachHolder);
-            } else if (eachHolder.isServingSegment(segment)) {
-              if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) {
-                dropServerHolders.add(eachHolder);
-              }
-            }
-          }
-      );
-    }
 
-    final CoordinatorStats stats = new CoordinatorStats();
+    // Find servers where we need to load the broadcast segments
+    final Set<ServerHolder> loadServerHolders =
+        params.getDruidCluster().getAllServers()
+              .stream()
+              .filter(
+                  (serverHolder) -> {
+                    ServerType serverType = serverHolder.getServer().getType();
+                    if (!serverType.isSegmentBroadcastTarget()) {
+                      return false;
+                    }
+
+                    final boolean isServingSegment =
+                        serverHolder.isServingSegment(segment);
+
+                    if (serverHolder.isDecommissioning()) {
+                      if (isServingSegment && !serverHolder.isDroppingSegment(segment)) {
+                        dropServerHolders.add(serverHolder);
+                      }
+                      return false;
+                    }
 
+                    return !isServingSegment && !serverHolder.isLoadingSegment(segment);
+                  }
+              )
+              .collect(Collectors.toSet());
+
+    final CoordinatorStats stats = new CoordinatorStats();
     return stats.accumulate(assign(loadServerHolders, segment))
                 .accumulate(drop(dropServerHolders, segment));
   }
@@ -110,6 +116,4 @@ public abstract class BroadcastDistributionRule implements Rule
 
     return stats;
   }
-
-  public abstract List<String> getColocatedDataSources();
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
index d095f11..ef5094c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java
@@ -25,21 +25,16 @@ import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
-import java.util.List;
 import java.util.Objects;
 
 public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
 {
   static final String TYPE = "broadcastForever";
 
-  private final List<String> colocatedDataSources;
-
   @JsonCreator
-  public ForeverBroadcastDistributionRule(
-      @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
-  )
+  public ForeverBroadcastDistributionRule()
   {
-    this.colocatedDataSources = colocatedDataSources;
+
   }
 
   @Override
@@ -50,13 +45,6 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
   }
 
   @Override
-  @JsonProperty
-  public List<String> getColocatedDataSources()
-  {
-    return colocatedDataSources;
-  }
-
-  @Override
   public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   {
     return true;
@@ -79,13 +67,12 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
       return false;
     }
 
-    ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o;
-    return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+    return true;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(getType(), colocatedDataSources);
+    return Objects.hash(getType());
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
index c40dff7..b1bf29e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java
@@ -25,23 +25,19 @@ import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
-import java.util.List;
 import java.util.Objects;
 
 public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
 {
   static final String TYPE = "broadcastByInterval";
   private final Interval interval;
-  private final List<String> colocatedDataSources;
 
   @JsonCreator
   public IntervalBroadcastDistributionRule(
-      @JsonProperty("interval") Interval interval,
-      @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
+      @JsonProperty("interval") Interval interval
   )
   {
     this.interval = interval;
-    this.colocatedDataSources = colocatedDataSources;
   }
 
   @Override
@@ -52,13 +48,6 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
   }
 
   @Override
-  @JsonProperty
-  public List<String> getColocatedDataSources()
-  {
-    return colocatedDataSources;
-  }
-
-  @Override
   public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   {
     return appliesTo(segment.getInterval(), referenceTimestamp);
@@ -79,26 +68,19 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
   @Override
   public boolean equals(Object o)
   {
-    if (o == this) {
+    if (this == o) {
       return true;
     }
-
-    if (o == null || o.getClass() != getClass()) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     IntervalBroadcastDistributionRule that = (IntervalBroadcastDistributionRule) o;
-
-    if (!Objects.equals(interval, that.interval)) {
-      return false;
-    }
-
-    return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+    return Objects.equals(getInterval(), that.getInterval());
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(getType(), interval, colocatedDataSources);
+    return Objects.hash(getInterval());
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
index 97c6e11..d48353d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
@@ -26,7 +26,6 @@ import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 
-import java.util.List;
 import java.util.Objects;
 
 public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
@@ -36,18 +35,15 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
 
   private final Period period;
   private final boolean includeFuture;
-  private final List<String> colocatedDataSources;
 
   @JsonCreator
   public PeriodBroadcastDistributionRule(
       @JsonProperty("period") Period period,
-      @JsonProperty("includeFuture") Boolean includeFuture,
-      @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
+      @JsonProperty("includeFuture") Boolean includeFuture
   )
   {
     this.period = period;
     this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
-    this.colocatedDataSources = colocatedDataSources;
   }
 
   @Override
@@ -58,13 +54,6 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
   }
 
   @Override
-  @JsonProperty
-  public List<String> getColocatedDataSources()
-  {
-    return colocatedDataSources;
-  }
-
-  @Override
   public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   {
     return appliesTo(segment.getInterval(), referenceTimestamp);
@@ -94,25 +83,17 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
     if (this == o) {
       return true;
     }
-
-    if (o == null || o.getClass() != getClass()) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     PeriodBroadcastDistributionRule that = (PeriodBroadcastDistributionRule) o;
-
-    if (!Objects.equals(period, that.period)) {
-      return false;
-    }
-    if (includeFuture != that.includeFuture) {
-      return false;
-    }
-    return Objects.equals(colocatedDataSources, that.colocatedDataSources);
+    return isIncludeFuture() == that.isIncludeFuture() &&
+           Objects.equals(getPeriod(), that.getPeriod());
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(getType(), period, colocatedDataSources);
+    return Objects.hash(getPeriod(), isIncludeFuture());
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 0402975..b6d310f 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -769,7 +769,7 @@ public class DataSourcesResource
           && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
           && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
           && Iterables.any(
-          segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
+          segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget
       )) {
         return true;
       }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 4715734..f5534bd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -174,7 +174,7 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
         )
     );
 
-    Assert.assertFalse(
+    Assert.assertTrue(
         CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
             Collections.singletonList(
                 new ImmutableSegmentLoadInfo(
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index f5455fb..6d8ef0a 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@@ -32,6 +34,7 @@ import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.loading.CacheTestSegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -39,12 +42,16 @@ import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -67,6 +74,7 @@ public class SegmentLoadDropHandlerTest
   private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
 
   private SegmentLoadDropHandler segmentLoadDropHandler;
+
   private DataSegmentAnnouncer announcer;
   private File infoDir;
   private AtomicInteger announceCount;
@@ -74,22 +82,36 @@ public class SegmentLoadDropHandlerTest
   private CacheTestSegmentLoader segmentLoader;
   private SegmentManager segmentManager;
   private List<Runnable> scheduledRunnable;
+  private SegmentLoaderConfig segmentLoaderConfig;
+  private SegmentLoaderConfig segmentLoaderConfigNoLocations;
+  private ScheduledExecutorFactory scheduledExecutorFactory;
+  private List<StorageLocationConfig> locations;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Before
   public void setUp()
   {
     try {
-      infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
-      infoDir.mkdirs();
-      for (File file : infoDir.listFiles()) {
-        file.delete();
-      }
+      infoDir = temporaryFolder.newFolder();
       log.info("Creating tmp test files in [%s]", infoDir);
     }
     catch (IOException e) {
       throw new RuntimeException(e);
     }
 
+    locations = Collections.singletonList(
+        new StorageLocationConfig(
+            infoDir,
+            100L,
+            100d
+        )
+    );
+
     scheduledRunnable = new ArrayList<>();
 
     segmentLoader = new CacheTestSegmentLoader();
@@ -132,57 +154,91 @@ public class SegmentLoadDropHandlerTest
       }
     };
 
-    segmentLoadDropHandler = new SegmentLoadDropHandler(
-        jsonMapper,
-        new SegmentLoaderConfig()
-        {
-          @Override
-          public File getInfoDir()
-          {
-            return infoDir;
-          }
 
-          @Override
-          public int getNumLoadingThreads()
-          {
-            return 5;
-          }
+    segmentLoaderConfig = new SegmentLoaderConfig()
+    {
+      @Override
+      public File getInfoDir()
+      {
+        return infoDir;
+      }
 
-          @Override
-          public int getAnnounceIntervalMillis()
-          {
-            return 50;
-          }
+      @Override
+      public int getNumLoadingThreads()
+      {
+        return 5;
+      }
+
+      @Override
+      public int getAnnounceIntervalMillis()
+      {
+        return 50;
+      }
+
+      @Override
+      public List<StorageLocationConfig> getLocations()
+      {
+        return locations;
+      }
+
+      @Override
+      public int getDropSegmentDelayMillis()
+      {
+        return 0;
+      }
+    };
+
+    segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
+    {
+      @Override
+      public int getNumLoadingThreads()
+      {
+        return 5;
+      }
+
+      @Override
+      public int getAnnounceIntervalMillis()
+      {
+        return 50;
+      }
 
+
+      @Override
+      public int getDropSegmentDelayMillis()
+      {
+        return 0;
+      }
+    };
+
+    scheduledExecutorFactory = new ScheduledExecutorFactory()
+    {
+      @Override
+      public ScheduledExecutorService create(int corePoolSize, String nameFormat)
+      {
+            /*
+               Override normal behavoir by adding the runnable to a list so that you can make sure
+               all the shceduled runnables are executed by explicitly calling run() on each item in the list
+             */
+        return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
+        {
           @Override
-          public int getDropSegmentDelayMillis()
+          public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
           {
-            return 0;
+            scheduledRunnable.add(command);
+            return null;
           }
-        },
+        };
+      }
+    };
+
+    segmentLoadDropHandler = new SegmentLoadDropHandler(
+        jsonMapper,
+        segmentLoaderConfig,
         announcer,
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         segmentManager,
-        new ScheduledExecutorFactory()
-        {
-          @Override
-          public ScheduledExecutorService create(int corePoolSize, String nameFormat)
-          {
-            /*
-               Override normal behavoir by adding the runnable to a list so that you can make sure
-               all the shceduled runnables are executed by explicitly calling run() on each item in the list
-             */
-            return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
-            {
-              @Override
-              public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-              {
-                scheduledRunnable.add(command);
-                return null;
-              }
-            };
-          }
-        }.create(5, "SegmentLoadDropHandlerTest-[%d]")
+        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+        new ServerTypeConfig(ServerType.HISTORICAL)
     );
   }
 
@@ -220,6 +276,40 @@ public class SegmentLoadDropHandlerTest
     segmentLoadDropHandler.stop();
   }
 
+  @Test
+  public void testSegmentLoading1BrokerWithNoLocations() throws Exception
+  {
+    SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler(
+        jsonMapper,
+        segmentLoaderConfigNoLocations,
+        announcer,
+        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
+        segmentManager,
+        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"),
+        new ServerTypeConfig(ServerType.BROKER)
+    );
+
+    segmentLoadDropHandlerBrokerWithNoLocations.start();
+    segmentLoadDropHandler.stop();
+  }
+
+  @Test
+  public void testSegmentLoading1HistoricalWithNoLocations()
+  {
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Segment cache locations must be set on historicals.");
+
+    new SegmentLoadDropHandler(
+        jsonMapper,
+        segmentLoaderConfigNoLocations,
+        announcer,
+        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
+        segmentManager,
+        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+        new ServerTypeConfig(ServerType.HISTORICAL)
+    );
+  }
+
   /**
    * Steps:
    * 1. addSegment() succesfully loads the segment and annouces it
@@ -383,12 +473,19 @@ public class SegmentLoadDropHandlerTest
           }
 
           @Override
+          public List<StorageLocationConfig> getLocations()
+          {
+            return locations;
+          }
+
+          @Override
           public int getAnnounceIntervalMillis()
           {
             return 50;
           }
         },
-        announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager
+        announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL)
     );
 
     Set<DataSegment> segments = new HashSet<>();
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 9bdd84e..c30a37c 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -23,10 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.guice.ServerTypeConfig;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -37,9 +40,15 @@ import org.apache.zookeeper.CreateMode;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -47,6 +56,8 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class ZkCoordinatorTest extends CuratorTestBase
 {
+  private static final Logger log = new Logger(ZkCoordinatorTest.class);
+
   private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER;
   private final DruidServerMetadata me = new DruidServerMetadata(
       "dummyServer",
@@ -67,9 +78,31 @@ public class ZkCoordinatorTest extends CuratorTestBase
   };
   private ZkCoordinator zkCoordinator;
 
+  private File infoDir;
+  private List<StorageLocationConfig> locations;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   @Before
   public void setUp() throws Exception
   {
+    try {
+      infoDir = temporaryFolder.newFolder();
+      log.info("Creating tmp test files in [%s]", infoDir);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    locations = Collections.singletonList(
+        new StorageLocationConfig(
+            infoDir,
+            100L,
+            100d
+        )
+    );
+
     setupServerAndCurator();
     curator.start();
     curator.blockUntilConnected();
@@ -102,11 +135,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
 
     SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
         ServerTestHelper.MAPPER,
-        new SegmentLoaderConfig(),
+        new SegmentLoaderConfig() {
+          @Override
+          public File getInfoDir()
+          {
+            return infoDir;
+          }
+
+          @Override
+          public int getNumLoadingThreads()
+          {
+            return 5;
+          }
+
+          @Override
+          public int getAnnounceIntervalMillis()
+          {
+            return 50;
+          }
+
+          @Override
+          public List<StorageLocationConfig> getLocations()
+          {
+            return locations;
+          }
+
+          @Override
+          public int getDropSegmentDelayMillis()
+          {
+            return 0;
+          }
+        },
         EasyMock.createNiceMock(DataSegmentAnnouncer.class),
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         EasyMock.createNiceMock(SegmentManager.class),
-        EasyMock.createNiceMock(ScheduledExecutorService.class)
+        EasyMock.createNiceMock(ScheduledExecutorService.class),
+        new ServerTypeConfig(ServerType.HISTORICAL)
     )
     {
       @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index 084a119..f37c92c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -66,9 +67,11 @@ public class BalanceSegmentsTest
   private DataSegment segment2;
   private DataSegment segment3;
   private DataSegment segment4;
+  private DataSegment segment5;
   private List<DataSegment> segments;
   private ListeningExecutorService balancerStrategyExecutor;
   private BalancerStrategy balancerStrategy;
+  private Set<String> broadcastDatasources;
 
   @Before
   public void setUp()
@@ -82,6 +85,7 @@ public class BalanceSegmentsTest
     segment2 = EasyMock.createMock(DataSegment.class);
     segment3 = EasyMock.createMock(DataSegment.class);
     segment4 = EasyMock.createMock(DataSegment.class);
+    segment5 = EasyMock.createMock(DataSegment.class);
 
     DateTime start1 = DateTimes.of("2012-01-01");
     DateTime start2 = DateTimes.of("2012-02-01");
@@ -130,12 +134,24 @@ public class BalanceSegmentsTest
         0,
         8L
     );
+    segment5 = new DataSegment(
+        "datasourceBroadcast",
+        new Interval(start2, start2.plusHours(1)),
+        version.toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        0,
+        8L
+    );
 
     segments = new ArrayList<>();
     segments.add(segment1);
     segments.add(segment2);
     segments.add(segment3);
     segments.add(segment4);
+    segments.add(segment5);
 
     peon1 = new LoadQueuePeonTester();
     peon2 = new LoadQueuePeonTester();
@@ -147,6 +163,8 @@ public class BalanceSegmentsTest
 
     balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
     balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+
+    broadcastDatasources = Collections.singleton("datasourceBroadcast");
   }
 
   @After
@@ -187,10 +205,11 @@ public class BalanceSegmentsTest
         ImmutableList.of(peon1, peon2)
     )
         .withBalancerStrategy(predefinedPickOrderStrategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    Assert.assertEquals(3, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
   }
 
   /**
@@ -213,10 +232,10 @@ public class BalanceSegmentsTest
     mockCoordinator(coordinator);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false))))
+    EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources))
             .andReturn(new BalancerSegmentHolder(druidServer2, segment3))
             .andReturn(new BalancerSegmentHolder(druidServer2, segment4));
-    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment1))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment2));
 
@@ -237,6 +256,7 @@ public class BalanceSegmentsTest
                                     .build() // ceil(3 * 0.6) = 2 segments from decommissioning servers
         )
         .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     params = new BalanceSegmentsTester(coordinator).run(params);
@@ -280,7 +300,7 @@ public class BalanceSegmentsTest
     mockCoordinator(coordinator);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment1))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment2))
             .andReturn(new BalancerSegmentHolder(druidServer2, segment3))
@@ -303,6 +323,7 @@ public class BalanceSegmentsTest
                                     .build()
         )
         .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     params = new BalanceSegmentsTester(coordinator).run(params);
@@ -328,7 +349,7 @@ public class BalanceSegmentsTest
     mockCoordinator(coordinator);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment1))
             .anyTimes();
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
@@ -343,6 +364,7 @@ public class BalanceSegmentsTest
         ImmutableList.of(false, true)
     )
         .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     params = new BalanceSegmentsTester(coordinator).run(params);
@@ -362,7 +384,7 @@ public class BalanceSegmentsTest
 
     ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment1))
             .once();
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
@@ -377,6 +399,7 @@ public class BalanceSegmentsTest
     )
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
         .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     params = new BalanceSegmentsTester(coordinator).run(params);
@@ -412,6 +435,7 @@ public class BalanceSegmentsTest
         ImmutableList.of(peon1, peon2)
     )
         .withBalancerStrategy(predefinedPickOrderStrategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .withDynamicConfigs(
             CoordinatorDynamicConfig
                 .builder()
@@ -451,6 +475,7 @@ public class BalanceSegmentsTest
         ImmutableList.of(peon1, peon2)
     )
         .withBalancerStrategy(predefinedPickOrderStrategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
                 2
@@ -542,6 +567,7 @@ public class BalanceSegmentsTest
         )
         .withUsedSegmentsInTest(segments)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+        .withBroadcastDatasources(broadcastDatasources)
         .withBalancerStrategy(balancerStrategy);
   }
 
@@ -611,7 +637,7 @@ public class BalanceSegmentsTest
     }
 
     @Override
-    public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
+    public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
     {
       return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size());
     }
@@ -635,9 +661,9 @@ public class BalanceSegmentsTest
 
     // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true))))
+    EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources))
             .andReturn(new BalancerSegmentHolder(druidServer2, segment2));
-    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
+    EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new BalancerSegmentHolder(druidServer1, segment1));
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(new ServerHolder(druidServer3, peon3))
@@ -656,6 +682,7 @@ public class BalanceSegmentsTest
                                     .build()
         )
         .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
         .build();
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
index 772b7ae..5fb1000 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java
@@ -35,6 +35,7 @@ public final class DruidClusterBuilder
 
   private @Nullable Set<ServerHolder> realtimes = null;
   private final Map<String, Iterable<ServerHolder>> historicals = new HashMap<>();
+  private @Nullable Set<ServerHolder> brokers = null;
 
   private DruidClusterBuilder()
   {
@@ -46,6 +47,12 @@ public final class DruidClusterBuilder
     return this;
   }
 
+  public DruidClusterBuilder withBrokers(ServerHolder... brokers)
+  {
+    this.brokers = new HashSet<>(Arrays.asList(brokers));
+    return this;
+  }
+
   public DruidClusterBuilder addTier(String tierName, ServerHolder... historicals)
   {
     if (this.historicals.putIfAbsent(tierName, Arrays.asList(historicals)) != null) {
@@ -56,6 +63,6 @@ public final class DruidClusterBuilder
 
   public DruidCluster build()
   {
-    return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals);
+    return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals, brokers);
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 73e829c..8aef2f2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.client.ImmutableDruidServerTests;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.easymock.EasyMock;
@@ -136,6 +137,7 @@ public class ReservoirSegmentSamplerTest
   @Test
   public void getRandomBalancerSegmentHolderTest()
   {
+    EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
     EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
     EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
@@ -143,6 +145,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer1);
 
+    EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
     EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
     EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -151,6 +154,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer2);
 
+    EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
     EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
     EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -159,6 +163,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer3);
 
+    EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
     EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
     EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
@@ -186,7 +191,7 @@ public class ReservoirSegmentSamplerTest
 
     Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
     for (int i = 0; i < 5000; i++) {
-      segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
+      segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()).getSegment(), 1);
     }
 
     for (DataSegment segment : segments) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
index e3b51a5..0dfe0ea 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.rules;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
@@ -44,15 +43,15 @@ public class BroadcastDistributionRuleSerdeTest
   public static List<Object[]> constructorFeeder()
   {
     return Lists.newArrayList(
-        new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of("large_source1", "large_source2"))},
-        new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of())},
-        new Object[]{new ForeverBroadcastDistributionRule(null)},
-        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of("large_source"))},
-        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of())},
-        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), null)},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of("large_source"))},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of())},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, null)}
+        new Object[]{new ForeverBroadcastDistributionRule()},
+        new Object[]{new ForeverBroadcastDistributionRule()},
+        new Object[]{new ForeverBroadcastDistributionRule()},
+        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+        new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index 70ec3eb..c2d4fd3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.coordinator.rules;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -269,7 +268,7 @@ public class BroadcastDistributionRuleTest
   public void testBroadcastToSingleDataSource()
   {
     final ForeverBroadcastDistributionRule rule =
-        new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
+        new ForeverBroadcastDistributionRule();
 
     CoordinatorStats stats = rule.run(
         null,
@@ -285,7 +284,7 @@ public class BroadcastDistributionRuleTest
         smallSegment
     );
 
-    Assert.assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
+    Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
     Assert.assertFalse(stats.hasPerTierStats());
 
     Assert.assertTrue(
@@ -295,10 +294,10 @@ public class BroadcastDistributionRuleTest
 
     Assert.assertTrue(
         holdersOfLargeSegments2.stream()
-                               .noneMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
+                               .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
     );
 
-    Assert.assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment));
+    Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
   }
 
   private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
@@ -331,7 +330,7 @@ public class BroadcastDistributionRuleTest
   public void testBroadcastDecommissioning()
   {
     final ForeverBroadcastDistributionRule rule =
-        new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
+        new ForeverBroadcastDistributionRule();
 
     CoordinatorStats stats = rule.run(
         null,
@@ -356,7 +355,6 @@ public class BroadcastDistributionRuleTest
   public void testBroadcastToMultipleDataSources()
   {
     final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(
-        ImmutableList.of("large_source", "large_source2")
     );
 
     CoordinatorStats stats = rule.run(
@@ -392,7 +390,7 @@ public class BroadcastDistributionRuleTest
   @Test
   public void testBroadcastToAllServers()
   {
-    final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(null);
+    final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
 
     CoordinatorStats stats = rule.run(
         null,
@@ -408,14 +406,14 @@ public class BroadcastDistributionRuleTest
         smallSegment
     );
 
-    Assert.assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
+    Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
     Assert.assertFalse(stats.hasPerTierStats());
 
     Assert.assertTrue(
         druidCluster
             .getAllServers()
             .stream()
-            .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
+            .allMatch(holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment))
     );
   }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 5badcb0..18d2813 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -33,6 +33,7 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
 import org.apache.druid.client.selector.ServerSelectorStrategy;
 import org.apache.druid.client.selector.TierSelectorStrategy;
+import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.CacheModule;
@@ -42,9 +43,11 @@ import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.QueryRunnerFactoryModule;
 import org.apache.druid.guice.QueryableModule;
 import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.ServerTypeConfig;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.RetryQueryRunnerConfig;
@@ -52,7 +55,12 @@ import org.apache.druid.query.lookup.LookupModule;
 import org.apache.druid.server.BrokerQueryResource;
 import org.apache.druid.server.ClientInfoResource;
 import org.apache.druid.server.ClientQuerySegmentWalker;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
 import org.apache.druid.server.http.BrokerResource;
+import org.apache.druid.server.http.HistoricalResource;
+import org.apache.druid.server.http.SegmentListerResource;
 import org.apache.druid.server.http.SelfDiscoveryResource;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.QueryCountStatsProvider;
@@ -123,12 +131,19 @@ public class CliBroker extends ServerRunnable
           Jerseys.addResource(binder, HttpServerInventoryViewResource.class);
 
           LifecycleModule.register(binder, Server.class);
+          binder.bind(SegmentManager.class).in(LazySingleton.class);
+          binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+          binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER));
+          Jerseys.addResource(binder, HistoricalResource.class);
+          Jerseys.addResource(binder, SegmentListerResource.class);
+
+          LifecycleModule.register(binder, ZkCoordinator.class);
 
           bindNodeRoleAndAnnouncer(
               binder,
               DiscoverySideEffectsProvider
                   .builder(NodeRole.BROKER)
-                  .serviceClasses(ImmutableList.of(LookupNodeService.class))
+                  .serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
                   .useLegacyAnnouncer(true)
                   .build()
           );
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 7483fec..d4a8df1 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -25,9 +25,9 @@ import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
-import com.google.inject.util.Providers;
 import io.airlift.airline.Command;
 import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.DruidServerConfig;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeRole;
@@ -43,6 +43,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.QueryRunnerFactoryModule;
 import org.apache.druid.guice.QueryableModule;
 import org.apache.druid.guice.QueryablePeonModule;
@@ -60,12 +61,13 @@ import org.apache.druid.indexing.worker.http.ShuffleResource;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.lookup.LookupModule;
-import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
 import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
+import org.apache.druid.server.http.HistoricalResource;
 import org.apache.druid.server.http.SegmentListerResource;
 import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@@ -138,14 +140,14 @@ public class CliIndexer extends ServerRunnable
 
             binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
             Jerseys.addResource(binder, SegmentListerResource.class);
-
-            LifecycleModule.register(binder, CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
-
             Jerseys.addResource(binder, ShuffleResource.class);
 
             LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
 
-            binder.bind(SegmentLoadDropHandler.class).toProvider(Providers.of(null));
+            binder.bind(SegmentManager.class).in(LazySingleton.class);
+            binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+            Jerseys.addResource(binder, HistoricalResource.class);
+            LifecycleModule.register(binder, ZkCoordinator.class);
 
             bindNodeRoleAndAnnouncer(
                 binder,
@@ -186,11 +188,11 @@ public class CliIndexer extends ServerRunnable
 
           @Provides
           @LazySingleton
-          public DataNodeService getDataNodeService()
+          public DataNodeService getDataNodeService(DruidServerConfig serverConfig)
           {
             return new DataNodeService(
                 DruidServer.DEFAULT_TIER,
-                0L,
+                serverConfig.getMaxSize(),
                 ServerType.INDEXER_EXECUTOR,
                 DruidServer.DEFAULT_PRIORITY
             );
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1160eb9..59ef48a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -61,7 +61,6 @@ import org.apache.druid.guice.ServerTypeConfig;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Parent;
 import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
@@ -109,15 +108,16 @@ import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffN
 import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
+import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordination.ZkCoordinator;
+import org.apache.druid.server.http.HistoricalResource;
 import org.apache.druid.server.http.SegmentListerResource;
 import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.eclipse.jetty.server.Server;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
@@ -154,6 +154,14 @@ public class CliPeon extends GuiceRunnable
   @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
   public String serverType = "indexer-executor";
 
+
+  /**
+   * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
+   * queryable tasks, such as streaming ingestion tasks.
+   */
+  @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments")
+  public String loadBroadcastSegments = "false";
+
   private static final Logger log = new Logger(CliPeon.class);
 
   @Inject
@@ -174,6 +182,7 @@ public class CliPeon extends GuiceRunnable
         new JoinableFactoryModule(),
         new Module()
         {
+          @SuppressForbidden(reason = "System#out, System#err")
           @Override
           public void configure(Binder binder)
           {
@@ -218,6 +227,13 @@ public class CliPeon extends GuiceRunnable
             Jerseys.addResource(binder, SegmentListerResource.class);
             binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
             LifecycleModule.register(binder, Server.class);
+
+            if ("true".equals(loadBroadcastSegments)) {
+              binder.bind(SegmentManager.class).in(LazySingleton.class);
+              binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+              Jerseys.addResource(binder, HistoricalResource.class);
+              LifecycleModule.register(binder, ZkCoordinator.class);
+            }
           }
 
           @Provides
@@ -247,16 +263,6 @@ public class CliPeon extends GuiceRunnable
           {
             return task.getId();
           }
-
-          @Provides
-          public SegmentListerResource getSegmentListerResource(
-              @Json ObjectMapper jsonMapper,
-              @Smile ObjectMapper smileMapper,
-              @Nullable BatchDataSegmentAnnouncer announcer
-          )
-          {
-            return new SegmentListerResource(jsonMapper, smileMapper, announcer, null);
-          }
         },
         new QueryablePeonModule(),
         new IndexingServiceFirehoseModule(),
diff --git a/services/src/test/java/org/apache/druid/cli/MainTest.java b/services/src/test/java/org/apache/druid/cli/MainTest.java
index 3e960f6..a1fbed5 100644
--- a/services/src/test/java/org/apache/druid/cli/MainTest.java
+++ b/services/src/test/java/org/apache/druid/cli/MainTest.java
@@ -50,7 +50,9 @@ public class MainTest
         //new Object[]{new CliInternalHadoopIndexer()},
 
         new Object[]{new CliMiddleManager()},
-        new Object[]{new CliRouter()}
+        new Object[]{new CliRouter()},
+
+        new Object[]{new CliIndexer()}
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 9f087ea..54e7e5a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -355,7 +355,7 @@ public class DruidSchema extends AbstractSchema
       AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
       if (segmentMetadata == null) {
         // segmentReplicatable is used to determine if segments are served by historical or realtime servers
-        long isRealtime = server.segmentReplicatable() ? 0 : 1;
+        long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
         segmentMetadata = AvailableSegmentMetadata.builder(
             segment,
             isRealtime,
@@ -366,7 +366,7 @@ public class DruidSchema extends AbstractSchema
         // Unknown segment.
         setAvailableSegmentMetadata(segment.getId(), segmentMetadata);
         segmentsNeedingRefresh.add(segment.getId());
-        if (!server.segmentReplicatable()) {
+        if (!server.isSegmentReplicationTarget()) {
           log.debug("Added new mutable segment[%s].", segment.getId());
           mutableSegments.add(segment.getId());
         } else {
@@ -384,7 +384,7 @@ public class DruidSchema extends AbstractSchema
             .withRealtime(recomputeIsRealtime(servers))
             .build();
         knownSegments.put(segment.getId(), metadataWithNumReplicas);
-        if (server.segmentReplicatable()) {
+        if (server.isSegmentReplicationTarget()) {
           // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
           // even if it's also available on non-replicatable (realtime) servers.
           mutableSegments.remove(segment.getId());
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index e3579aa..0832858 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -11887,7 +11887,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
 
   @Test
   @Parameters(source = QueryContextForJoinProvider.class)
-  public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map<String, Object> queryContext) throws Exception
+  public void testNestedGroupByOnInlineDataSourceWithFilter(Map<String, Object> queryContext) throws Exception
   {
     try {
       testQuery(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org