You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/07 22:43:14 UTC

[GitHub] jihoonson closed pull request #6676: Handoff should ignore segments that are dropped by drop rules

jihoonson closed pull request #6676: Handoff should ignore segments that are dropped by drop rules
URL: https://github.com/apache/incubator-druid/pull/6676
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 4f59d87a849..ef731b31869 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,16 +23,13 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Throwables;
 import com.google.inject.Inject;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.query.SegmentDescriptor;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.joda.time.Interval;
-
-import java.util.List;
 
 public class CoordinatorClient
 {
@@ -49,18 +46,20 @@ public CoordinatorClient(
     this.druidLeaderClient = druidLeaderClient;
   }
 
-
-  public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
+  public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
   {
     try {
       FullResponseHolder response = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET,
-                                        StringUtils.format(
-                                           "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
-                                           dataSource,
-                                           interval.toString().replace('/', '_'),
-                                           incompleteOk
-                                       ))
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format(
+                  "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
+                  dataSource,
+                  descriptor.getInterval(),
+                  descriptor.getPartitionNumber(),
+                  descriptor.getVersion()
+              )
+          )
       );
 
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -70,12 +69,9 @@ public CoordinatorClient(
             response.getContent()
         );
       }
-      return jsonMapper.readValue(
-          response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
-          {
-
-          }
-      );
+      return jsonMapper.readValue(response.getContent(), new TypeReference<Boolean>()
+      {
+      });
     }
     catch (Exception e) {
       throw Throwables.propagate(e);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index 6d062151e83..028183f943b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -19,18 +19,13 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -95,13 +90,7 @@ void checkForSegmentHandoffs()
         Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
         SegmentDescriptor descriptor = entry.getKey();
         try {
-          List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
-              dataSource,
-              descriptor.getInterval(),
-              true
-          );
-
-          if (isHandOffComplete(loadedSegments, entry.getKey())) {
+          if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
             log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
             entry.getValue().lhs.execute(entry.getValue().rhs);
             itr.remove();
@@ -131,30 +120,6 @@ void checkForSegmentHandoffs()
     }
   }
 
-
-  static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
-  {
-    for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
-      if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
-          && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
-             == descriptor.getPartitionNumber()
-          && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
-          && Iterables.any(
-          segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
-          {
-            @Override
-            public boolean apply(DruidServerMetadata input)
-            {
-              return input.segmentReplicatable();
-            }
-          }
-      )) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @Override
   public void close()
   {
diff --git a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
index 450dbd4af08..03d3e6ad198 100644
--- a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
@@ -37,8 +37,13 @@
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.MetadataSegmentManager;
+import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.rules.LoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.http.security.DatasourceResourceFilter;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -85,6 +90,7 @@
 
   private final CoordinatorServerView serverInventoryView;
   private final MetadataSegmentManager databaseSegmentManager;
+  private final MetadataRuleManager databaseRuleManager;
   private final IndexingServiceClient indexingServiceClient;
   private final AuthConfig authConfig;
   private final AuthorizerMapper authorizerMapper;
@@ -93,6 +99,7 @@
   public DatasourcesResource(
       CoordinatorServerView serverInventoryView,
       MetadataSegmentManager databaseSegmentManager,
+      MetadataRuleManager databaseRuleManager,
       @Nullable IndexingServiceClient indexingServiceClient,
       AuthConfig authConfig,
       AuthorizerMapper authorizerMapper
@@ -100,6 +107,7 @@ public DatasourcesResource(
   {
     this.serverInventoryView = serverInventoryView;
     this.databaseSegmentManager = databaseSegmentManager;
+    this.databaseRuleManager = databaseRuleManager;
     this.indexingServiceClient = indexingServiceClient;
     this.authConfig = authConfig;
     this.authorizerMapper = authorizerMapper;
@@ -647,4 +655,85 @@ public Response getSegmentDataSourceSpecificInterval(
         );
     return Response.ok(retval).build();
   }
+
+  /**
+   * Used by the realtime tasks to learn whether a segment is handed off or not.
+   * It returns true when the segment will never be handed off or is already handed off. Otherwise, it returns false.
+   */
+  @GET
+  @Path("/{dataSourceName}/handoffComplete")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response isHandOffComplete(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") final String interval,
+      @QueryParam("partitionNumber") final int partitionNumber,
+      @QueryParam("version") final String version
+  )
+  {
+    try {
+      final List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSourceName);
+      final Interval theInterval = Intervals.of(interval);
+      final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber);
+      final DateTime now = DateTimes.nowUtc();
+      // dropped means a segment will never be handed off, i.e it completed hand off
+      // init to true, reset to false only if this segment can be loaded by rules
+      boolean dropped = true;
+      for (Rule rule : rules) {
+        if (rule.appliesTo(theInterval, now)) {
+          if (rule instanceof LoadRule) {
+            dropped = false;
+          }
+          break;
+        }
+      }
+      if (dropped) {
+        return Response.ok(true).build();
+      }
+
+      TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline(
+          new TableDataSource(dataSourceName)
+      );
+      if (timeline == null) {
+        log.debug("No timeline found for datasource[%s]", dataSourceName);
+        return Response.ok(false).build();
+      }
+
+      Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions(
+          theInterval);
+      FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable = FunctionalIterable
+          .create(lookup).transformCat(
+              (TimelineObjectHolder<String, SegmentLoadInfo> input) ->
+                  Iterables.transform(
+                      input.getObject(),
+                      (PartitionChunk<SegmentLoadInfo> chunk) ->
+                          chunk.getObject().toImmutableSegmentLoadInfo()
+                  )
+          );
+      if (isSegmentLoaded(loadInfoIterable, descriptor)) {
+        return Response.ok(true).build();
+      }
+
+      return Response.ok(false).build();
+    }
+    catch (Exception e) {
+      log.error(e, "Error while handling hand off check request");
+      return Response.serverError().entity(ImmutableMap.of("error", e.toString())).build();
+    }
+  }
+
+  static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
+  {
+    for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
+      if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
+          && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
+          && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
+          && Iterables.any(
+          segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
+      )) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 15964158073..228d7957843 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -19,23 +19,16 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
+import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CoordinatorBasedSegmentHandoffNotifierTest
@@ -55,27 +48,10 @@ public void testHandoffCallbackNotCalled()
   {
     Interval interval = Intervals.of("2011-04-01/2011-04-02");
     SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
-    DataSegment segment = new DataSegment(
-        "test_ds",
-        interval,
-        "v1",
-        null,
-        null,
-        null,
-        new NumberedShardSpec(2, 3),
-        0, 0
-    );
 
     CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-    EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
-            .andReturn(
-                Collections.singletonList(
-                    new ImmutableSegmentLoadInfo(
-                        segment,
-                        Sets.newHashSet(createRealtimeServerMetadata("a1"))
-                    )
-                )
-            )
+    EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+            .andReturn(false)
             .anyTimes();
     EasyMock.replay(coordinatorClient);
     CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -102,27 +78,11 @@ public void testHandoffCallbackCalled()
   {
     Interval interval = Intervals.of("2011-04-01/2011-04-02");
     SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
-    DataSegment segment = new DataSegment(
-        "test_ds",
-        interval,
-        "v1",
-        null,
-        null,
-        null,
-        new NumberedShardSpec(2, 3),
-        0, 0
-    );
+
     final AtomicBoolean callbackCalled = new AtomicBoolean(false);
     CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-    EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
-            .andReturn(
-                Collections.singletonList(
-                    new ImmutableSegmentLoadInfo(
-                        segment,
-                        Sets.newHashSet(createHistoricalServerMetadata("a1"))
-                    )
-                )
-            )
+    EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+            .andReturn(true)
             .anyTimes();
     EasyMock.replay(coordinatorClient);
     CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -144,177 +104,4 @@ public void testHandoffCallbackCalled()
     Assert.assertTrue(callbackCalled.get());
     EasyMock.verify(coordinatorClient);
   }
-
-  @Test
-  public void testHandoffChecksForVersion()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v2", 2)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v2", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-  }
-
-  @Test
-  public void testHandoffChecksForAssignableServer()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 2),
-                    Sets.newHashSet(createRealtimeServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-  }
-
-  @Test
-  public void testHandoffChecksForPartitionNumber()
-  {
-    Interval interval = Intervals.of(
-        "2011-04-01/2011-04-02"
-    );
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 1)
-        )
-    );
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(interval, "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(interval, "v1", 2)
-        )
-    );
-
-  }
-
-  @Test
-  public void testHandoffChecksForInterval()
-  {
-
-    Assert.assertFalse(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
-        )
-    );
-
-    Assert.assertTrue(
-        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
-            Collections.singletonList(
-                new ImmutableSegmentLoadInfo(
-                    createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
-                    Sets.newHashSet(createHistoricalServerMetadata("a"))
-                )
-            ),
-            new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
-        )
-    );
-  }
-
-  private DruidServerMetadata createRealtimeServerMetadata(String name)
-  {
-    return createServerMetadata(name, ServerType.REALTIME);
-  }
-
-  private DruidServerMetadata createHistoricalServerMetadata(String name)
-  {
-    return createServerMetadata(name, ServerType.HISTORICAL);
-  }
-
-  private DruidServerMetadata createServerMetadata(String name, ServerType type)
-  {
-    return new DruidServerMetadata(
-        name,
-        name,
-        null,
-        10000,
-        type,
-        "tier",
-        1
-    );
-  }
-
-  private DataSegment createSegment(Interval interval, String version, int partitionNumber)
-  {
-    return new DataSegment(
-        "test_ds",
-        interval,
-        version,
-        null,
-        null,
-        null,
-        new NumberedShardSpec(partitionNumber, 100),
-        0, 0
-    );
-  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
index d42b6783f5d..3d2c78654bd 100644
--- a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
@@ -21,13 +21,23 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.SegmentLoadInfo;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.rules.IntervalDropRule;
+import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
@@ -37,6 +47,11 @@
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionHolder;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -46,6 +61,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -156,6 +172,7 @@ public void testGetFullQueryableDataSources()
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER
     );
@@ -240,6 +257,7 @@ public Access authorize(AuthenticationResult authenticationResult1, Resource res
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         authMapper
     );
@@ -294,6 +312,7 @@ public void testGetSimpleQueryableDataSources()
         inventoryView,
         null,
         null,
+        null,
         new AuthConfig(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER
     );
@@ -323,7 +342,7 @@ public void testFullGetTheDataSource()
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", "full");
     ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -340,7 +359,7 @@ public void testNullGetTheDataSource()
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Assert.assertEquals(204, datasourcesResource.getTheDataSource("none", null).getStatus());
     EasyMock.verify(inventoryView, server);
   }
@@ -361,7 +380,7 @@ public void testSimpleGetTheDataSource()
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -400,7 +419,7 @@ public void testSimpleGetTheDataSourceManyTiers()
     ).atLeastOnce();
 
     EasyMock.replay(inventoryView, server, server2, server3);
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getTheDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -431,7 +450,7 @@ public void testGetSegmentDataSourceIntervals()
     List<Interval> expectedIntervals = new ArrayList<>();
     expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
     expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
 
     Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null);
     Assert.assertEquals(response.getEntity(), null);
@@ -478,7 +497,7 @@ public void testGetSegmentDataSourceSpecificInterval()
     ).atLeastOnce();
     EasyMock.replay(inventoryView);
 
-    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+    DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
     Response response = datasourcesResource.getSegmentDataSourceSpecificInterval(
         "invalidDataSource",
         "2010-01-01/P1D",
@@ -548,6 +567,7 @@ public void testDeleteDataSourceSpecificInterval()
     DatasourcesResource datasourcesResource = new DatasourcesResource(
         inventoryView,
         null,
+        null,
         indexingServiceClient,
         new AuthConfig(),
         null
@@ -567,6 +587,7 @@ public void testDeleteDataSource()
     DatasourcesResource datasourcesResource = new DatasourcesResource(
         inventoryView,
         null,
+        null,
         indexingServiceClient,
         new AuthConfig(),
         null
@@ -579,4 +600,254 @@ public void testDeleteDataSource()
     EasyMock.verify(indexingServiceClient, server);
   }
 
+  @Test
+  public void testIsHandOffComplete()
+  {
+    MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
+    Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
+    Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
+    DatasourcesResource datasourcesResource = new DatasourcesResource(
+        inventoryView,
+        null,
+        databaseRuleManager,
+        null,
+        new AuthConfig(),
+        null
+    );
+
+    // test dropped
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.replay(databaseRuleManager);
+
+    String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    Response response1 = datasourcesResource.isHandOffComplete("dataSource1", interval1, 1, "v1");
+    Assert.assertTrue((boolean) response1.getEntity());
+
+    EasyMock.verify(databaseRuleManager);
+
+    // test isn't dropped and no timeline found
+    EasyMock.reset(databaseRuleManager);
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+            .andReturn(null)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager);
+
+    String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z";
+    Response response2 = datasourcesResource.isHandOffComplete("dataSource1", interval2, 1, "v1");
+    Assert.assertFalse((boolean) response2.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager);
+
+    // test isn't dropped and timeline exist
+    String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z";
+    SegmentLoadInfo segmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1));
+    segmentLoadInfo.addServer(createHistoricalServerMetadata("test"));
+    VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<String, SegmentLoadInfo>(
+        null)
+    {
+      @Override
+      public List<TimelineObjectHolder<String, SegmentLoadInfo>> lookupWithIncompletePartitions(Interval interval)
+      {
+        PartitionHolder<SegmentLoadInfo> partitionHolder = new PartitionHolder<>(new NumberedPartitionChunk<>(
+            1,
+            1,
+            segmentLoadInfo
+        ));
+        List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new ArrayList<>();
+        ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1", partitionHolder));
+        return ret;
+      }
+    };
+    EasyMock.reset(inventoryView, databaseRuleManager);
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+            .andReturn(ImmutableList.of(loadRule, dropRule))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+            .andReturn(timeline)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager);
+
+    Response response3 = datasourcesResource.isHandOffComplete("dataSource1", interval3, 1, "v1");
+    Assert.assertTrue((boolean) response3.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager);
+  }
+
+  @Test
+  public void testSegmentLoadChecksForVersion()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v2", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v2", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testSegmentLoadChecksForAssignableServer()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createRealtimeServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+  }
+
+  @Test
+  public void testSegmentLoadChecksForPartitionNumber()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 1)
+        )
+    );
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testSegmentLoadChecksForInterval()
+  {
+
+    Assert.assertFalse(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
+        )
+    );
+
+    Assert.assertTrue(
+        DatasourcesResource.isSegmentLoaded(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
+        )
+    );
+  }
+
+  private DruidServerMetadata createRealtimeServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.REALTIME);
+  }
+
+  private DruidServerMetadata createHistoricalServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.HISTORICAL);
+  }
+
+  private DruidServerMetadata createServerMetadata(String name, ServerType type)
+  {
+    return new DruidServerMetadata(
+        name,
+        name,
+        null,
+        10000,
+        type,
+        "tier",
+        1
+    );
+  }
+
+  private DataSegment createSegment(Interval interval, String version, int partitionNumber)
+  {
+    return new DataSegment(
+        "test_ds",
+        interval,
+        version,
+        null,
+        null,
+        null,
+        new NumberedShardSpec(partitionNumber, 100),
+        0, 0
+    );
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org