You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/11 15:45:46 UTC

[incubator-druid] branch master updated: Coordinator: Allow dropping all segments. (#7447)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a517f8c  Coordinator: Allow dropping all segments. (#7447)
a517f8c is described below

commit a517f8ce49b7cb967cf791fa7b0f7cc90a3e11d1
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Thu Apr 11 08:45:38 2019 -0700

    Coordinator: Allow dropping all segments. (#7447)
    
    Removes the coordinator sanity check that prevents it from dropping all
    segments. It's useful to get rid of this, since the behavior is
    unintuitive for dev/testing clusters where users might regularly want
    to drop all their data to get back to a clean slate.
    
    But the sanity check was there for a reason: to prevent a race condition
    where the coordinator might drop all segments if it ran before the
    first metadata store poll finished. This patch addresses that concern
    differently, by allowing methods in MetadataSegmentManager to return
    null if a poll has not happened yet, and canceling coordinator runs
    in that case.
    
    This patch also makes the "dataSources" reference in
    SQLMetadataSegmentManager volatile. I'm not sure why it wasn't volatile
    before, but it seems necessary to me: it's not final, and it's dereferenced
    from multiple threads without synchronization.
---
 docs/content/design/coordinator.md                 |  3 +-
 .../druid/metadata/MetadataSegmentManager.java     | 11 +++
 .../druid/metadata/SQLMetadataSegmentManager.java  | 90 ++++++++++++++--------
 .../druid/server/coordinator/DruidCoordinator.java | 50 ++++++++++--
 .../helper/DruidCoordinatorCleanupUnneeded.java    | 22 ++----
 .../coordinator/helper/DruidCoordinatorHelper.java | 10 ++-
 .../helper/DruidCoordinatorSegmentInfoLoader.java  |  8 +-
 .../apache/druid/server/http/MetadataResource.java | 11 ++-
 .../metadata/SQLMetadataSegmentManagerTest.java    | 39 ++++++++++
 9 files changed, 186 insertions(+), 58 deletions(-)

diff --git a/docs/content/design/coordinator.md b/docs/content/design/coordinator.md
index 810f212..cf3c0f8 100644
--- a/docs/content/design/coordinator.md
+++ b/docs/content/design/coordinator.md
@@ -52,8 +52,7 @@ Segments can be automatically loaded and dropped from the cluster based on a set
 
 ### Cleaning Up Segments
 
-Each run, the Druid Coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.
-Note that if all segments in database are deleted(or marked unused), then Coordinator will not drop anything from the Historicals. This is done to prevent a race condition in which the Coordinator would drop all segments if it started running cleanup before it finished polling the database for available segments for the first time and believed that there were no segments.
+Each run, the Druid coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.
 
 ### Segment Availability
 
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index 436ad12..a5584a9 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -58,6 +58,13 @@ public interface MetadataSegmentManager
   @Nullable
   ImmutableDruidDataSource getDataSource(String dataSourceName);
 
+  /**
+   * Returns a collection of known datasources.
+   *
+   * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
+   * not yet been polled.)
+   */
+  @Nullable
   Collection<ImmutableDruidDataSource> getDataSources();
 
   /**
@@ -65,7 +72,11 @@ public interface MetadataSegmentManager
    * unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
    * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
    * several times.
+   *
+   * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
+   * not yet been polled.)
    */
+  @Nullable
   Iterable<DataSegment> iterateAllSegments();
 
   Collection<String> getAllDataSourceNames();
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index dff9c9d..0bcf9fa 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -66,6 +66,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 @ManageLifecycle
 public class SQLMetadataSegmentManager implements MetadataSegmentManager
@@ -102,9 +104,14 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   private final Supplier<MetadataStorageTablesConfig> dbTables;
   private final SQLMetadataConnector connector;
 
-  private ConcurrentHashMap<String, DruidDataSource> dataSources = new ConcurrentHashMap<>();
+  // Volatile since this reference is reassigned in "poll" and then read from in other threads.
+  // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map)
+  @Nullable
+  private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
 
-  /** The number of times this SQLMetadataSegmentManager was started. */
+  /**
+   * The number of times this SQLMetadataSegmentManager was started.
+   */
   private long startCount = 0;
   /**
    * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
@@ -200,7 +207,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
         return;
       }
 
-      dataSources = new ConcurrentHashMap<>();
+      dataSources = null;
       currentStartOrder = -1;
       exec.shutdownNow();
       exec = null;
@@ -325,7 +332,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
           ).bind("dataSource", dataSource).execute()
       );
 
-      dataSources.remove(dataSource);
+      Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
 
       if (removed == 0) {
         return false;
@@ -348,18 +355,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
       // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
       // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
       List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
-      dataSources.computeIfPresent(
-          dataSourceName,
-          (dsName, dataSource) -> {
-            for (SegmentId possibleSegmentId : possibleSegmentIds) {
-              if (dataSource.removeSegment(possibleSegmentId) != null) {
-                break;
-              }
-            }
-            // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
-            //noinspection ReturnOfNull
-            return dataSource.isEmpty() ? null : dataSource;
-          }
+      Optional.ofNullable(dataSources).ifPresent(
+          m ->
+              m.computeIfPresent(
+                  dataSourceName,
+                  (dsName, dataSource) -> {
+                    for (SegmentId possibleSegmentId : possibleSegmentIds) {
+                      if (dataSource.removeSegment(possibleSegmentId) != null) {
+                        break;
+                      }
+                    }
+                    // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
+                    //noinspection ReturnOfNull
+                    return dataSource.isEmpty() ? null : dataSource;
+                  }
+              )
       );
 
       return removed;
@@ -375,14 +385,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   {
     try {
       final boolean removed = removeSegmentFromTable(segmentId.toString());
-      dataSources.computeIfPresent(
-          segmentId.getDataSource(),
-          (dsName, dataSource) -> {
-            dataSource.removeSegment(segmentId);
-            // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
-            //noinspection ReturnOfNull
-            return dataSource.isEmpty() ? null : dataSource;
-          }
+      Optional.ofNullable(dataSources).ifPresent(
+          m ->
+              m.computeIfPresent(
+                  segmentId.getDataSource(),
+                  (dsName, dataSource) -> {
+                    dataSource.removeSegment(segmentId);
+                    // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
+                    //noinspection ReturnOfNull
+                    return dataSource.isEmpty() ? null : dataSource;
+                  }
+              )
       );
       return removed;
     }
@@ -422,23 +435,37 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   @Nullable
   public ImmutableDruidDataSource getDataSource(String dataSourceName)
   {
-    final DruidDataSource dataSource = dataSources.get(dataSourceName);
+    final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
     return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
   }
 
   @Override
+  @Nullable
   public Collection<ImmutableDruidDataSource> getDataSources()
   {
-    return dataSources.values()
-                      .stream()
-                      .map(DruidDataSource::toImmutableDruidDataSource)
-                      .collect(Collectors.toList());
+    return Optional.ofNullable(dataSources)
+                   .map(m ->
+                            m.values()
+                             .stream()
+                             .map(DruidDataSource::toImmutableDruidDataSource)
+                             .collect(Collectors.toList())
+                   )
+                   .orElse(null);
   }
 
   @Override
+  @Nullable
   public Iterable<DataSegment> iterateAllSegments()
   {
-    return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
+    final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
+    if (dataSourcesSnapshot == null) {
+      return null;
+    }
+
+    return () -> dataSourcesSnapshot.values()
+                                    .stream()
+                                    .flatMap(dataSource -> dataSource.getSegments().stream())
+                                    .iterator();
   }
 
   @Override
@@ -543,6 +570,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
               .addSegmentIfAbsent(segment);
         });
 
+    // Replace "dataSources" atomically.
     dataSources = newDataSources;
   }
 
@@ -557,7 +585,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
    */
   private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
   {
-    DruidDataSource dataSource = dataSources.get(segment.getDataSource());
+    DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
     if (dataSource == null) {
       return segment;
     }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 3b5faef..27bee2f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -74,8 +74,10 @@ import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -88,6 +90,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 @ManageLifecycle
 public class DruidCoordinator
@@ -242,7 +245,9 @@ public class DruidCoordinator
     return loadManagementPeons;
   }
 
-  /** @return tier -> { dataSource -> underReplicationCount } map */
+  /**
+   * @return tier -> { dataSource -> underReplicationCount } map
+   */
   public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
   {
     final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
@@ -251,9 +256,15 @@ public class DruidCoordinator
       return underReplicationCountsPerDataSourcePerTier;
     }
 
+    final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();
+
+    if (dataSegments == null) {
+      return underReplicationCountsPerDataSourcePerTier;
+    }
+
     final DateTime now = DateTimes.nowUtc();
 
-    for (final DataSegment segment : iterateAvailableDataSegments()) {
+    for (final DataSegment segment : dataSegments) {
       final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
 
       for (final Rule rule : rules) {
@@ -285,7 +296,13 @@ public class DruidCoordinator
       return retVal;
     }
 
-    for (DataSegment segment : iterateAvailableDataSegments()) {
+    final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();
+
+    if (dataSegments == null) {
+      return retVal;
+    }
+
+    for (DataSegment segment : dataSegments) {
       if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
         retVal.addTo(segment.getDataSource(), 1);
       } else {
@@ -298,8 +315,14 @@ public class DruidCoordinator
 
   public Map<String, Double> getLoadStatus()
   {
-    Map<String, Double> loadStatus = new HashMap<>();
-    for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getDataSources()) {
+    final Map<String, Double> loadStatus = new HashMap<>();
+    final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+
+    if (dataSources == null) {
+      return loadStatus;
+    }
+
+    for (ImmutableDruidDataSource dataSource : dataSources) {
       final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
       final int availableSegmentSize = segments.size();
 
@@ -453,7 +476,11 @@ public class DruidCoordinator
    * is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
    * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
    * several times.
+   *
+   * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
+   * not yet been polled.)
    */
+  @Nullable
   public Iterable<DataSegment> iterateAvailableDataSegments()
   {
     return metadataSegmentManager.iterateAllSegments();
@@ -643,10 +670,16 @@ public class DruidCoordinator
         BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
 
         // Do coordinator stuff.
+        final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+        if (dataSources == null) {
+          log.info("Metadata store not polled yet, skipping this run.");
+          return;
+        }
+
         DruidCoordinatorRuntimeParams params =
             DruidCoordinatorRuntimeParams.newBuilder()
                                          .withStartTime(startTime)
-                                         .withDataSources(metadataSegmentManager.getDataSources())
+                                         .withDataSources(dataSources)
                                          .withDynamicConfigs(getDynamicConfigs())
                                          .withCompactionConfig(getCompactionConfig())
                                          .withEmitter(emitter)
@@ -656,6 +689,11 @@ public class DruidCoordinator
           // Don't read state and run state in the same helper otherwise racy conditions may exist
           if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
             params = helper.run(params);
+
+            if (params == null) {
+              // This helper wanted to cancel the run. No log message, since the helper should have logged a reason.
+              return;
+            }
           }
         }
       }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
index a7a1bcc..2e77577 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.SortedSet;
 
 /**
+ *
  */
 public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
 {
@@ -45,21 +46,12 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
     Set<DataSegment> availableSegments = params.getAvailableSegments();
     DruidCluster cluster = params.getDruidCluster();
 
-    if (availableSegments.isEmpty()) {
-      log.info(
-          "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
-          "a race condition in which the coordinator would drop all segments if it started running cleanup before " +
-          "it finished polling the metadata storage for available segments for the first time."
-      );
-      return params.buildFromExisting().withCoordinatorStats(stats).build();
-    }
-
-    // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
-    // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
-    // segments at all, we should have all of them.)
-    // Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
-    // This is done to prevent a race condition in which the coordinator would drop all segments if it started running
-    // cleanup before it finished polling the metadata storage for available segments for the first time.
+    // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It's
+    // also filled atomically, so if there are any segments at all, we should have all of them.)
+    //
+    // Note that if the metadata store has not been polled yet, "getAvailableSegments" would throw an error since
+    // "availableSegments" is null. But this won't happen, since the earlier helper "DruidCoordinatorSegmentInfoLoader"
+    // would have canceled the run.
     for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
       for (ServerHolder serverHolder : serverHolders) {
         ImmutableDruidServer server = serverHolder.getServer();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java
index a2752c3..78ee92b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java
@@ -21,7 +21,10 @@ package org.apache.druid.server.coordinator.helper;
 
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 
+import javax.annotation.Nullable;
+
 /**
+ *
  */
 public interface DruidCoordinatorHelper
 {
@@ -29,8 +32,13 @@ public interface DruidCoordinatorHelper
    * Implementations of this method run various activities performed by the coordinator.
    * Input params can be used and modified. They are typically in a list and returned
    * DruidCoordinatorRuntimeParams is passed to the next helper.
+   *
    * @param params
-   * @return same as input or a modified value to be used by next helper.
+   *
+   * @return same as input or a modified value to be used by next helper. Null return
+   * values will prevent future DruidCoordinatorHelpers from running until the next
+   * cycle.
    */
+  @Nullable
   DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params);
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
index 801bd3b..2353247 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
@@ -43,6 +43,12 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
   {
     log.info("Starting coordination. Getting available segments.");
 
+    final Iterable<DataSegment> dataSegments = coordinator.iterateAvailableDataSegments();
+    if (dataSegments == null) {
+      log.info("Metadata store not polled yet, canceling this run.");
+      return null;
+    }
+
     // The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments
     // and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to
     // Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called
@@ -54,7 +60,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
     //
     //noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047
     Iterable<DataSegment> availableSegmentsWithSizeChecking = Iterables.transform(
-        coordinator.iterateAvailableDataSegments(),
+        dataSegments,
         segment -> {
           if (segment.getSize() < 0) {
             log.makeAlert("No size on a segment")
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index c7e2702..af106fb 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -55,6 +55,7 @@ import javax.ws.rs.core.StreamingOutput;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -96,7 +97,10 @@ public class MetadataResource
       @Context final HttpServletRequest req
   )
   {
-    final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
+    // If we haven't polled the metadata store yet, use an empty list of datasources.
+    final Collection<ImmutableDruidDataSource> druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources())
+                                                                          .orElse(Collections.emptyList());
+
     final Set<String> dataSourceNamesPreAuth;
     if (includeDisabled != null) {
       dataSourceNamesPreAuth = new TreeSet<>(metadataSegmentManager.getAllDataSourceNames());
@@ -154,7 +158,10 @@ public class MetadataResource
       @QueryParam("datasources") final Set<String> datasources
   )
   {
-    Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
+    // If we haven't polled the metadata store yet, use an empty list of datasources.
+    Collection<ImmutableDruidDataSource> druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources())
+                                                                    .orElse(Collections.emptyList());
+
     if (datasources != null && !datasources.isEmpty()) {
       druidDataSources = druidDataSources.stream()
                                          .filter(src -> datasources.contains(src.getName()))
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
index a9f8f3c..242dc5e 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -39,6 +39,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
 
 
 public class SQLMetadataSegmentManagerTest
@@ -124,9 +125,47 @@ public class SQLMetadataSegmentManagerTest
         manager.getAllDataSourceNames()
     );
     Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        manager.getDataSources().stream().map(d -> d.getName()).collect(Collectors.toList())
+    );
+    Assert.assertEquals(
         ImmutableSet.of(segment1, segment2),
         ImmutableSet.copyOf(manager.getDataSource("wikipedia").getSegments())
     );
+    Assert.assertEquals(
+                        ImmutableSet.of(segment1, segment2),
+                        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
+  public void testNoPoll()
+  {
+    manager.start();
+    Assert.assertTrue(manager.isStarted());
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        manager.getAllDataSourceNames()
+    );
+    Assert.assertNull(manager.getDataSources());
+    Assert.assertNull(manager.getDataSource("wikipedia"));
+    Assert.assertNull(manager.iterateAllSegments());
+  }
+
+  @Test
+  public void testPollThenStop()
+  {
+    manager.start();
+    manager.poll();
+    manager.stop();
+    Assert.assertFalse(manager.isStarted());
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        manager.getAllDataSourceNames()
+    );
+    Assert.assertNull(manager.getDataSources());
+    Assert.assertNull(manager.getDataSource("wikipedia"));
+    Assert.assertNull(manager.iterateAllSegments());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org