You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/01/07 22:43:18 UTC

[incubator-druid] branch master updated: Handoff should ignore segments that are dropped by drop rules (#6676)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ebb7b5  Handoff should ignore segments that are dropped by drop rules (#6676)
8ebb7b5 is described below

commit 8ebb7b558b617d8807e6bdc204cabbda9a4ac346
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Tue Jan 8 06:43:11 2019 +0800

    Handoff should ignore segments that are dropped by drop rules (#6676)
    
    * Handoff should ignore segments that are dropped by drop rules
    
    * fix travis-ci
    
    * fix tests
    
    * address comments
    
    * remove line added by accident
    
    * address comments
    
    * add javadoc and logging the full stack trace of exception
    
    * add error message
---
 .../client/coordinator/CoordinatorClient.java      |  34 ++-
 .../CoordinatorBasedSegmentHandoffNotifier.java    |  37 +--
 .../druid/server/http/DatasourcesResource.java     |  89 +++++++
 ...CoordinatorBasedSegmentHandoffNotifierTest.java | 225 +---------------
 .../druid/server/http/DatasourcesResourceTest.java | 283 ++++++++++++++++++++-
 5 files changed, 388 insertions(+), 280 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 4f59d87..ef731b3 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,16 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Throwables;
 import com.google.inject.Inject;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.query.SegmentDescriptor;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.joda.time.Interval;
-
-import java.util.List;
 
 public class CoordinatorClient
 {
@@ -49,18 +46,20 @@ public class CoordinatorClient
     this.druidLeaderClient = druidLeaderClient;
   }
 
-
-  public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
+  public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
   {
     try {
       FullResponseHolder response = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET,
-                                        StringUtils.format(
-                                           "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
-                                           dataSource,
-                                           interval.toString().replace('/', '_'),
-                                           incompleteOk
-                                       ))
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format(
+                  "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
+                  dataSource,
+                  descriptor.getInterval(),
+                  descriptor.getPartitionNumber(),
+                  descriptor.getVersion()
+              )
+          )
       );
 
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -70,12 +69,9 @@ public class CoordinatorClient
             response.getContent()
         );
       }
-      return jsonMapper.readValue(
-          response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
-          {
-
-          }
-      );
+      return jsonMapper.readValue(response.getContent(), new TypeReference<Boolean>()
+      {
+      });
     }
     catch (Exception e) {
       throw Throwables.propagate(e);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index 6d06215..028183f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -19,18 +19,13 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -95,13 +90,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
         Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
         SegmentDescriptor descriptor = entry.getKey();
         try {
-          List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
-              dataSource,
-              descriptor.getInterval(),
-              true
-          );
-
-          if (isHandOffComplete(loadedSegments, entry.getKey())) {
+          if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
             log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
             entry.getValue().lhs.execute(entry.getValue().rhs);
             itr.remove();
@@ -131,30 +120,6 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
     }
   }
 
-
-  static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
-  {
-    for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
-      if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
-          && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
-             == descriptor.getPartitionNumber()
-          && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
-          && Iterables.any(
-          segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
-          {
-            @Override
-            public boolean apply(DruidServerMetadata input)
-            {
-              return input.segmentReplicatable();
-            }
-          }
-      )) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @Override
   public void close()
   {
diff --git a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
index 450dbd4..03d3e6a 100644
--- a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
@@ -37,8 +37,13 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.MetadataSegmentManager;
+import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.rules.LoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.http.security.DatasourceResourceFilter;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -85,6 +90,7 @@ public class DatasourcesResource
 
   private final CoordinatorServerView serverInventoryView;
   private final MetadataSegmentManager databaseSegmentManager;
+  private final MetadataRuleManager databaseRuleManager;
   private final IndexingServiceClient indexingServiceClient;
   private final AuthConfig authConfig;
   private final AuthorizerMapper authorizerMapper;
@@ -93,6 +99,7 @@ public class DatasourcesResource
   public DatasourcesResource(
       CoordinatorServerView serverInventoryView,
       MetadataSegmentManager databaseSegmentManager,
+      MetadataRuleManager databaseRuleManager,
       @Nullable IndexingServiceClient indexingServiceClient,
       AuthConfig authConfig,
       AuthorizerMapper authorizerMapper
@@ -100,6 +107,7 @@ public class DatasourcesResource
   {
     this.serverInventoryView = serverInventoryView;
     this.databaseSegmentManager = databaseSegmentManager;
+    this.databaseRuleManager = databaseRuleManager;
     this.indexingServiceClient = indexingServiceClient;
     this.authConfig = authConfig;
     this.authorizerMapper = authorizerMapper;
@@ -647,4 +655,85 @@ public class DatasourcesResource
         );
     return Response.ok(retval).build();
   }
+
+  /**
+   * Used by the realtime tasks to learn whether a segment is handed off or not.
+   * It returns true when the segment will never be handed off or is already handed off. Otherwise, it returns false.
+   */
+  @GET
+  @Path("/{dataSourceName}/handoffComplete")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response isHandOffComplete(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") final String interval,
+      @QueryParam("partitionNumber") final int partitionNumber,
+      @QueryParam("version") final String version
+  )
+  {
+    try {
+      final List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSourceName);
+      final Interval theInterval = Intervals.of(interval);
+      final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber);
+      final DateTime now = DateTimes.nowUtc();
+      // dropped means a segment will never be handed off, i.e it completed hand off
+      // init to true, reset to false only if this segment can be loaded by rules
+      boolean dropped = true;
+      for (Rule rule : rules) {
+        if (rule.appliesTo(theInterval, now)) {
+          if (rule instanceof LoadRule) {
+            dropped = false;
+          }
+          break;
+        }
+      }
+      if (dropped) {
+        return Response.ok(true).build();
+      }
+
+      TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline(
+          new TableDataSource(dataSourceName)
+      );
+      if (timeline == null) {
+        log.debug("No timeline found for datasource[%s]", dataSourceName);
+        return Response.ok(false).build();
+      }
+
+      Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions(
+          theInterval);
+      FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable = FunctionalIterable
+          .create(lookup).transformCat(
+              (TimelineObjectHolder<String, SegmentLoadInfo> input) ->
+                  Iterables.transform(
+                      input.getObject(),
+                      (PartitionChunk<SegmentLoadInfo> chunk) ->
+                          chunk.getObject().toImmutableSegmentLoadInfo()
+                  )
+          );
+      if (isSegmentLoaded(loadInfoIterable, descriptor)) {
+        return Response.ok(true).build();
+      }
+
+      return Response.ok(false).build();
+    }
+    catch (Exception e) {
+      log.error(e, "Error while handling hand off check request");
+      return Response.serverError().entity(ImmutableMap.of("error", e.toString())).build();
+    }
+  }
+
+  static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
+  {
+    for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
+      if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
+          && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
+          && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
+          && Iterables.any(
+          segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
+      )) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 1596415..228d795 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -19,23 +19,16 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
+import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CoordinatorBasedSegmentHandoffNotifierTest
@@ -55,27 +48,10 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
   {
     Interval interval = Intervals.of("2011-04-01/2011-04-02");
     SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
-    DataSegment segment = new DataSegment(
-        "test_ds",
-        interval,
-        "v1",
-        null,
-        null,
-        null,
-        new NumberedShardSpec(2, 3),
-        0, 0
-    );
 
     CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-    EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
-            .andReturn(
-                Collections.singletonList(
-                    new ImmutableSegmentLoadInfo(
-                        segment,
-                        Sets.newHashSet(createRealtimeServerMetadata("a1"))
-                    )
-                )
-            )
+    EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+            .andReturn(false)
             .anyTimes();
     EasyMock.replay(coordinatorClient);
     CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -102,27 +78,11 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
   {
     Interval interval = Intervals.of("2011-04-01/2011-04-02");
     SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
-    DataSegment segment = new DataSegment(
-        "test_ds",
-        interval,
-        "v1",
-        null,
-        null,
-        null,
-        new NumberedShardSpec(2, 3),
-        0, 0
-    );
+
     final AtomicBoolean callbackCalled = new AtomicBoolean(false);
     CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-    EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
-            .andReturn(
-                Collections.singletonList(
-                    new ImmutableSegmentLoadInfo(
-                        segment,
-                        Sets.newHashSet(createHistoricalServerMetadata("a1"))
-                    )
-                )
-            )
+    EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+            .andReturn(true)
             .anyTimes();
     EasyMock.replay(coordinatorClient);
     CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -144,177 +104,4 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
     Assert.assertTrue(callbackCalled.get());
     EasyMock.verify(coordinatorClient);
   }
-
-  @Test
-  public void testHandoffChecksForVersion()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v2", 2)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v2", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-  }
-
-  @Test
-  public void testHandoffChecksForAssignableServer()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createRealtimeServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-  }
-
-  @Test
-  public void testHandoffChecksForPartitionNumber()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 1)
-        )
-    );
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-  }
-
-  @Test
-  public void testHandoffChecksForInterval()
-  {
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
-        )
-    );
-  }
-
-  private DruidServerMetadata createRealtimeServerMetadata(String name)
-  {
-    return createServerMetadata(name, ServerType.REALTIME);
-  }
-
-  private DruidServerMetadata createHistoricalServerMetadata(String name)
-  {
-    return createServerMetadata(name, ServerType.HISTORICAL);
-  }
-
-  private DruidServerMetadata createServerMetadata(String name, ServerType type)
-  {
-    return new DruidServerMetadata(
-        name,
-        name,
-        null,
-        10000,
-        type,
-        "tier",
-        1
-    );
-  }
-
-  private DataSegment createSegment(Interval interval, String version, int partitionNumber)
-  {
-    return new DataSegment(
-        "test_ds",
-        interval,
-        version,
-        null,
-        null,
-        null,
-        new NumberedShardSpec(partitionNumber, 100),
-        0, 0
-    );
-  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
index d42b678..3d2c786 100644
--- a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
@@ -21,13 +21,23 @@ package org.apache.druid.server.http;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.SegmentLoadInfo;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.rules.IntervalDropRule;
+import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
@@ -37,6 +47,11 @@ import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionHolder;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -46,6 +61,7 @@ import org.junit.Test;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -156,6 +172,7 @@ public class DatasourcesResourceTest
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER
     );
@@ -240,6 +257,7 @@ public class DatasourcesResourceTest
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         authMapper
     );
@@ -294,6 +312,7 @@ public class DatasourcesResourceTest
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER
     );
@@ -323,7 +342,7 @@ public class DatasourcesResourceTest
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", "full");
     ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -340,7 +359,7 @@ public class DatasourcesResourceTest
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Assert.assertEquals(204, datasourcesResource.getTheDataSource("none", null).getStatus());
     EasyMock.verify(inventoryView, server);
   }
@@ -361,7 +380,7 @@ public class DatasourcesResourceTest
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -400,7 +419,7 @@ public class DatasourcesResourceTest
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server, server2, server3);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -431,7 +450,7 @@ public class DatasourcesResourceTest
     List<Interval> expectedIntervals = new ArrayList<>();
     expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
     expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
 
     Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null);
     Assert.assertEquals(response.getEntity(), null);
@@ -478,7 +497,7 @@ public class DatasourcesResourceTest
     ).atLeastOnce();
     EasyMock.replay(inventoryView);
 
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getSegmentDataSourceSpecificInterval(
         "invalidDataSource",
         "2010-01-01/P1D",
@@ -548,6 +567,7 @@ public class DatasourcesResourceTest
     DatasourcesResource datasourcesResource = new DatasourcesResource(
         inventoryView,
         null,
+        null,
         indexingServiceClient,
         new AuthConfig(),
         null
@@ -567,6 +587,7 @@ public class DatasourcesResourceTest
     DatasourcesResource datasourcesResource = new DatasourcesResource(
         inventoryView,
         null,
+        null,
         indexingServiceClient,
         new AuthConfig(),
         null
@@ -579,4 +600,254 @@ public class DatasourcesResourceTest
     EasyMock.verify(indexingServiceClient, server);
   }
 
+  @Test
+  public void testIsHandOffComplete()
+  {
+    MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
+    Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
+    Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
+    DatasourcesResource datasourcesResource = new DatasourcesResource(
+        inventoryView,
+        null,
+        databaseRuleManager,
+        null,
+        new AuthConfig(),
+        null
+    );
+
+    // test dropped
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.replay(databaseRuleManager);
+
+    String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    Response response1 = datasourcesResource.isHandOffComplete("dataSource1", interval1, 1, "v1");
+    Assert.assertTrue((boolean) response1.getEntity());
+
+    EasyMock.verify(databaseRuleManager);
+
+    // test isn't dropped and no timeline found
+    EasyMock.reset(databaseRuleManager);
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+            .andReturn(null)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager);
+
+    String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z";
+    Response response2 = datasourcesResource.isHandOffComplete("dataSource1", interval2, 1, "v1");
+    Assert.assertFalse((boolean) response2.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager);
+
+    // test isn't dropped and timeline exist
+    String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z";
+    SegmentLoadInfo segmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1));
+    segmentLoadInfo.addServer(createHistoricalServerMetadata("test"));
+    VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<String, SegmentLoadInfo>(
+        null)
+    {
+      @Override
+      public List<TimelineObjectHolder<String, SegmentLoadInfo>> lookupWithIncompletePartitions(Interval interval)
+      {
+        PartitionHolder<SegmentLoadInfo> partitionHolder = new PartitionHolder<>(new NumberedPartitionChunk<>(
+            1,
+            1,
+            segmentLoadInfo
+        ));
+        List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new ArrayList<>();
+        ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1", partitionHolder));
+        return ret;
+      }
+    };
+    EasyMock.reset(inventoryView, databaseRuleManager);
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+            .andReturn(timeline)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager);
+
+    Response response3 = datasourcesResource.isHandOffComplete("dataSource1", interval3, 1, "v1");
+    Assert.assertTrue((boolean) response3.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager);
+  }
+
+  @Test
+  public void testSegmentLoadChecksForVersion()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v2", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v2", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testSegmentLoadChecksForAssignableServer()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createRealtimeServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+  }
+
+  @Test
+  public void testSegmentLoadChecksForPartitionNumber()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 1)
+        )
+    );
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testSegmentLoadChecksForInterval()
+  {
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
+        )
+    );
+  }
+
+  private DruidServerMetadata createRealtimeServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.REALTIME);
+  }
+
+  private DruidServerMetadata createHistoricalServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.HISTORICAL);
+  }
+
+  private DruidServerMetadata createServerMetadata(String name, ServerType type)
+  {
+    return new DruidServerMetadata(
+        name,
+        name,
+        null,
+        10000,
+        type,
+        "tier",
+        1
+    );
+  }
+
+  private DataSegment createSegment(Interval interval, String version, int partitionNumber)
+  {
+    return new DataSegment(
+        "test_ds",
+        interval,
+        version,
+        null,
+        null,
+        null,
+        new NumberedShardSpec(partitionNumber, 100),
+        0, 0
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org