You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/01/07 22:43:18 UTC
[incubator-druid] branch master updated: Handoff should ignore
segments that are dropped by drop rules (#6676)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8ebb7b5 Handoff should ignore segments that are dropped by drop rules (#6676)
8ebb7b5 is described below
commit 8ebb7b558b617d8807e6bdc204cabbda9a4ac346
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Tue Jan 8 06:43:11 2019 +0800
Handoff should ignore segments that are dropped by drop rules (#6676)
* Handoff should ignore segments that are dropped by drop rules
* fix travis-ci
* fix tests
* address comments
* remove line added by accident
* address comments
* add javadoc and logging the full stack trace of exception
* add error message
---
.../client/coordinator/CoordinatorClient.java | 34 ++-
.../CoordinatorBasedSegmentHandoffNotifier.java | 37 +--
.../druid/server/http/DatasourcesResource.java | 89 +++++++
...CoordinatorBasedSegmentHandoffNotifierTest.java | 225 +---------------
.../druid/server/http/DatasourcesResourceTest.java | 283 ++++++++++++++++++++-
5 files changed, 388 insertions(+), 280 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 4f59d87..ef731b3 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,16 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.joda.time.Interval;
-
-import java.util.List;
public class CoordinatorClient
{
@@ -49,18 +46,20 @@ public class CoordinatorClient
this.druidLeaderClient = druidLeaderClient;
}
-
- public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
+ public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
{
try {
FullResponseHolder response = druidLeaderClient.go(
- druidLeaderClient.makeRequest(HttpMethod.GET,
- StringUtils.format(
- "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
- dataSource,
- interval.toString().replace('/', '_'),
- incompleteOk
- ))
+ druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ StringUtils.format(
+ "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
+ dataSource,
+ descriptor.getInterval(),
+ descriptor.getPartitionNumber(),
+ descriptor.getVersion()
+ )
+ )
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -70,12 +69,9 @@ public class CoordinatorClient
response.getContent()
);
}
- return jsonMapper.readValue(
- response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
- {
-
- }
- );
+ return jsonMapper.readValue(response.getContent(), new TypeReference<Boolean>()
+ {
+ });
}
catch (Exception e) {
throw Throwables.propagate(e);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index 6d06215..028183f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -19,18 +19,13 @@
package org.apache.druid.segment.realtime.plumber;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -95,13 +90,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
- List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
- dataSource,
- descriptor.getInterval(),
- true
- );
-
- if (isHandOffComplete(loadedSegments, entry.getKey())) {
+ if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
@@ -131,30 +120,6 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
}
}
-
- static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
- {
- for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
- if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
- && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
- == descriptor.getPartitionNumber()
- && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
- && Iterables.any(
- segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
- {
- @Override
- public boolean apply(DruidServerMetadata input)
- {
- return input.segmentReplicatable();
- }
- }
- )) {
- return true;
- }
- }
- return false;
- }
-
@Override
public void close()
{
diff --git a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
index 450dbd4..03d3e6a 100644
--- a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
@@ -37,8 +37,13 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.MetadataSegmentManager;
+import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.rules.LoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -85,6 +90,7 @@ public class DatasourcesResource
private final CoordinatorServerView serverInventoryView;
private final MetadataSegmentManager databaseSegmentManager;
+ private final MetadataRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
private final AuthConfig authConfig;
private final AuthorizerMapper authorizerMapper;
@@ -93,6 +99,7 @@ public class DatasourcesResource
public DatasourcesResource(
CoordinatorServerView serverInventoryView,
MetadataSegmentManager databaseSegmentManager,
+ MetadataRuleManager databaseRuleManager,
@Nullable IndexingServiceClient indexingServiceClient,
AuthConfig authConfig,
AuthorizerMapper authorizerMapper
@@ -100,6 +107,7 @@ public class DatasourcesResource
{
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
+ this.databaseRuleManager = databaseRuleManager;
this.indexingServiceClient = indexingServiceClient;
this.authConfig = authConfig;
this.authorizerMapper = authorizerMapper;
@@ -647,4 +655,85 @@ public class DatasourcesResource
);
return Response.ok(retval).build();
}
+
+ /**
+ * Used by the realtime tasks to learn whether a segment is handed off or not.
+ * It returns true when the segment will never be handed off or is already handed off. Otherwise, it returns false.
+ */
+ @GET
+ @Path("/{dataSourceName}/handoffComplete")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response isHandOffComplete(
+ @PathParam("dataSourceName") String dataSourceName,
+ @QueryParam("interval") final String interval,
+ @QueryParam("partitionNumber") final int partitionNumber,
+ @QueryParam("version") final String version
+ )
+ {
+ try {
+ final List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSourceName);
+ final Interval theInterval = Intervals.of(interval);
+ final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber);
+ final DateTime now = DateTimes.nowUtc();
+ // dropped means a segment will never be handed off, i.e it completed hand off
+ // init to true, reset to false only if this segment can be loaded by rules
+ boolean dropped = true;
+ for (Rule rule : rules) {
+ if (rule.appliesTo(theInterval, now)) {
+ if (rule instanceof LoadRule) {
+ dropped = false;
+ }
+ break;
+ }
+ }
+ if (dropped) {
+ return Response.ok(true).build();
+ }
+
+ TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline(
+ new TableDataSource(dataSourceName)
+ );
+ if (timeline == null) {
+ log.debug("No timeline found for datasource[%s]", dataSourceName);
+ return Response.ok(false).build();
+ }
+
+ Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions(
+ theInterval);
+ FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable = FunctionalIterable
+ .create(lookup).transformCat(
+ (TimelineObjectHolder<String, SegmentLoadInfo> input) ->
+ Iterables.transform(
+ input.getObject(),
+ (PartitionChunk<SegmentLoadInfo> chunk) ->
+ chunk.getObject().toImmutableSegmentLoadInfo()
+ )
+ );
+ if (isSegmentLoaded(loadInfoIterable, descriptor)) {
+ return Response.ok(true).build();
+ }
+
+ return Response.ok(false).build();
+ }
+ catch (Exception e) {
+ log.error(e, "Error while handling hand off check request");
+ return Response.serverError().entity(ImmutableMap.of("error", e.toString())).build();
+ }
+ }
+
+ static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
+ {
+ for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
+ if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
+ && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
+ && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
+ && Iterables.any(
+ segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
+ )) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 1596415..228d795 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -19,23 +19,16 @@
package org.apache.druid.segment.realtime.plumber;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
+import org.junit.Assert;
import org.junit.Test;
-import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatorBasedSegmentHandoffNotifierTest
@@ -55,27 +48,10 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
- DataSegment segment = new DataSegment(
- "test_ds",
- interval,
- "v1",
- null,
- null,
- null,
- new NumberedShardSpec(2, 3),
- 0, 0
- );
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
- EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
- .andReturn(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- segment,
- Sets.newHashSet(createRealtimeServerMetadata("a1"))
- )
- )
- )
+ EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+ .andReturn(false)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -102,27 +78,11 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
- DataSegment segment = new DataSegment(
- "test_ds",
- interval,
- "v1",
- null,
- null,
- null,
- new NumberedShardSpec(2, 3),
- 0, 0
- );
+
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
- EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
- .andReturn(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- segment,
- Sets.newHashSet(createHistoricalServerMetadata("a1"))
- )
- )
- )
+ EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+ .andReturn(true)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@@ -144,177 +104,4 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
Assert.assertTrue(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
-
- @Test
- public void testHandoffChecksForVersion()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v2", 2)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v2", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- }
-
- @Test
- public void testHandoffChecksForAssignableServer()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createRealtimeServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
- }
-
- @Test
- public void testHandoffChecksForPartitionNumber()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 1)
- )
- );
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- }
-
- @Test
- public void testHandoffChecksForInterval()
- {
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
- )
- );
- }
-
- private DruidServerMetadata createRealtimeServerMetadata(String name)
- {
- return createServerMetadata(name, ServerType.REALTIME);
- }
-
- private DruidServerMetadata createHistoricalServerMetadata(String name)
- {
- return createServerMetadata(name, ServerType.HISTORICAL);
- }
-
- private DruidServerMetadata createServerMetadata(String name, ServerType type)
- {
- return new DruidServerMetadata(
- name,
- name,
- null,
- 10000,
- type,
- "tier",
- 1
- );
- }
-
- private DataSegment createSegment(Interval interval, String version, int partitionNumber)
- {
- return new DataSegment(
- "test_ds",
- interval,
- version,
- null,
- null,
- null,
- new NumberedShardSpec(partitionNumber, 100),
- 0, 0
- );
- }
}
diff --git a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
index d42b678..3d2c786 100644
--- a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
@@ -21,13 +21,23 @@ package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.rules.IntervalDropRule;
+import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@@ -37,6 +47,11 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -46,6 +61,7 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -156,6 +172,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
+ null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@@ -240,6 +257,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
+ null,
new AuthConfig(),
authMapper
);
@@ -294,6 +312,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
+ null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@@ -323,7 +342,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", "full");
ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
Assert.assertEquals(200, response.getStatus());
@@ -340,7 +359,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Assert.assertEquals(204, datasourcesResource.getTheDataSource("none", null).getStatus());
EasyMock.verify(inventoryView, server);
}
@@ -361,7 +380,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -400,7 +419,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server, server2, server3);
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -431,7 +450,7 @@ public class DatasourcesResourceTest
List<Interval> expectedIntervals = new ArrayList<>();
expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null);
Assert.assertEquals(response.getEntity(), null);
@@ -478,7 +497,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView);
- DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getSegmentDataSourceSpecificInterval(
"invalidDataSource",
"2010-01-01/P1D",
@@ -548,6 +567,7 @@ public class DatasourcesResourceTest
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
+ null,
indexingServiceClient,
new AuthConfig(),
null
@@ -567,6 +587,7 @@ public class DatasourcesResourceTest
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
+ null,
indexingServiceClient,
new AuthConfig(),
null
@@ -579,4 +600,254 @@ public class DatasourcesResourceTest
EasyMock.verify(indexingServiceClient, server);
}
+ @Test
+ public void testIsHandOffComplete()
+ {
+ MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
+ Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
+ Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
+ DatasourcesResource datasourcesResource = new DatasourcesResource(
+ inventoryView,
+ null,
+ databaseRuleManager,
+ null,
+ new AuthConfig(),
+ null
+ );
+
+ // test dropped
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.replay(databaseRuleManager);
+
+ String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+ Response response1 = datasourcesResource.isHandOffComplete("dataSource1", interval1, 1, "v1");
+ Assert.assertTrue((boolean) response1.getEntity());
+
+ EasyMock.verify(databaseRuleManager);
+
+ // test isn't dropped and no timeline found
+ EasyMock.reset(databaseRuleManager);
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+ .andReturn(null)
+ .once();
+ EasyMock.replay(inventoryView, databaseRuleManager);
+
+ String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z";
+ Response response2 = datasourcesResource.isHandOffComplete("dataSource1", interval2, 1, "v1");
+ Assert.assertFalse((boolean) response2.getEntity());
+
+ EasyMock.verify(inventoryView, databaseRuleManager);
+
+ // test isn't dropped and timeline exist
+ String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z";
+ SegmentLoadInfo segmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1));
+ segmentLoadInfo.addServer(createHistoricalServerMetadata("test"));
+ VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<String, SegmentLoadInfo>(
+ null)
+ {
+ @Override
+ public List<TimelineObjectHolder<String, SegmentLoadInfo>> lookupWithIncompletePartitions(Interval interval)
+ {
+ PartitionHolder<SegmentLoadInfo> partitionHolder = new PartitionHolder<>(new NumberedPartitionChunk<>(
+ 1,
+ 1,
+ segmentLoadInfo
+ ));
+ List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new ArrayList<>();
+ ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1", partitionHolder));
+ return ret;
+ }
+ };
+ EasyMock.reset(inventoryView, databaseRuleManager);
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
+ .andReturn(timeline)
+ .once();
+ EasyMock.replay(inventoryView, databaseRuleManager);
+
+ Response response3 = datasourcesResource.isHandOffComplete("dataSource1", interval3, 1, "v1");
+ Assert.assertTrue((boolean) response3.getEntity());
+
+ EasyMock.verify(inventoryView, databaseRuleManager);
+ }
+
+ @Test
+ public void testSegmentLoadChecksForVersion()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v2", 2)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v2", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ }
+
+ @Test
+ public void testSegmentLoadChecksForAssignableServer()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createRealtimeServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+ }
+
+ @Test
+ public void testSegmentLoadChecksForPartitionNumber()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 1)
+ )
+ );
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ }
+
+ @Test
+ public void testSegmentLoadChecksForInterval()
+ {
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
+ )
+ );
+ }
+
+ private DruidServerMetadata createRealtimeServerMetadata(String name)
+ {
+ return createServerMetadata(name, ServerType.REALTIME);
+ }
+
+ private DruidServerMetadata createHistoricalServerMetadata(String name)
+ {
+ return createServerMetadata(name, ServerType.HISTORICAL);
+ }
+
+ private DruidServerMetadata createServerMetadata(String name, ServerType type)
+ {
+ return new DruidServerMetadata(
+ name,
+ name,
+ null,
+ 10000,
+ type,
+ "tier",
+ 1
+ );
+ }
+
+ private DataSegment createSegment(Interval interval, String version, int partitionNumber)
+ {
+ return new DataSegment(
+ "test_ds",
+ interval,
+ version,
+ null,
+ null,
+ null,
+ new NumberedShardSpec(partitionNumber, 100),
+ 0, 0
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org