You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/04/25 21:24:48 UTC

[incubator-druid] branch master updated: API to drop data by interval (#7494)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8308ffe  API to drop data by interval (#7494)
8308ffe is described below

commit 8308ffef1f5b8edd598f92598004ee8a28eb2bcf
Author: Surekha <su...@imply.io>
AuthorDate: Thu Apr 25 14:24:40 2019 -0700

    API to drop data by interval (#7494)
    
    * Add api to drop data by interval
    
    * update to address comments
    
    * unused imports
    
    * PR comments + add tests in SQLMetadataSegmentManagerTest
    
    *  update tests and docs
---
 docs/content/operations/api-reference.md           |  18 ++
 .../druid/metadata/MetadataSegmentManager.java     |   4 +
 .../druid/metadata/SQLMetadataSegmentManager.java  |  56 ++++
 .../druid/server/http/DataSourcesResource.java     |  86 ++++++
 .../metadata/SQLMetadataSegmentManagerTest.java    | 297 +++++++++++++++++++++
 .../druid/server/http/DataSourcesResourceTest.java | 268 +++++++++++++++++++
 6 files changed, 729 insertions(+)

diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index f45029a..23b98c1 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -220,6 +220,11 @@ Returns full segment metadata for a specific segment in the cluster.
 
 Return the tiers that a datasource exists in.
 
+#### Note for coordinator's POST and DELETE API's
+The segments would be enabled when these API's are called, but then can be disabled again by the coordinator if any dropRule matches. Segments enabled by these API's might not be loaded by historical processes if no loadRule matches.  If an indexing or kill task runs at the same time as these API's are invoked, the behavior is undefined. Some segments might be killed and others might be enabled. It's also possible that all segments might be disabled but at the same time, the indexing tas [...]
+
+Caution : Avoid using indexing or kill tasks and these API's at the same time for the same datasource and time chunk. (It's fine if the time chunks or datasource don't overlap)
+
 ##### POST
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}`
@@ -230,6 +235,19 @@ Enables all segments of datasource which are not overshadowed by others.
 
 Enables a segment of a datasource.
 
+* `/druid/coordinator/v1/datasources/{dataSourceName}/markUnused`
+
+Marks segments unused for a datasource by interval or set of segment Ids. The request payload contains the interval or set of segment Ids to be marked unused.
+Either interval or segment ids should be provided, if both or none are provided in the payload , the API would throw an error (400 BAD REQUEST).Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
+
+JSON Request Payload:
+
+ |Key|Description|Example|
+|----------|-------------|---------|
+|`interval`|The interval for which to mark segments unused|"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"|
+|`segmentIds`|Set of segment Ids to be marked unused|["segmentId1", "segmentId2"]|
+
+
 ##### DELETE<a name="coordinator-delete"></a>
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}`
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index a5584a9..23cd68a 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -53,6 +53,10 @@ public interface MetadataSegmentManager
 
   boolean removeSegment(SegmentId segmentId);
 
+  long disableSegments(String dataSource, Collection<String> segmentIds);
+
+  int disableSegments(String dataSource, Interval interval);
+
   boolean isStarted();
 
   @Nullable
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index 220c26b..018d4a9 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -61,6 +61,7 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -407,6 +408,61 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
     }
   }
 
+  @Override
+  public long disableSegments(String dataSource, Collection<String> segmentIds)
+  {
+    if (segmentIds.isEmpty()) {
+      return 0;
+    }
+    final long[] result = new long[1];
+    try {
+      connector.getDBI().withHandle(handle -> {
+        Batch batch = handle.createBatch();
+        segmentIds
+            .forEach(segmentId -> batch.add(
+                StringUtils.format(
+                    "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s' ",
+                    getSegmentsTable(),
+                    dataSource,
+                    segmentId
+                )
+            ));
+        final int[] resultArr = batch.execute();
+        result[0] = Arrays.stream(resultArr).filter(x -> x > 0).count();
+        return result[0];
+      });
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result[0];
+  }
+
+  @Override
+  public int disableSegments(String dataSource, Interval interval)
+  {
+    try {
+      return connector.getDBI().withHandle(
+          handle -> handle
+              .createStatement(
+                  StringUtils
+                      .format(
+                          "UPDATE %s SET used=false WHERE datasource = :datasource "
+                          + "AND start >= :start AND %2$send%2$s <= :end",
+                          getSegmentsTable(),
+                          connector.getQuoteString()
+                      ))
+              .bind("datasource", dataSource)
+              .bind("start", interval.getStart().toString())
+              .bind("end", interval.getEnd().toString())
+              .execute()
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private boolean removeSegmentFromTable(String segmentId)
   {
     final int removed = connector.getDBI().withHandle(
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 0553505..00ba289 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.server.http;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
@@ -686,6 +689,55 @@ public class DataSourcesResource
     }
   }
 
+  @POST
+  @Path("/{dataSourceName}/markUnused")
+  @ResourceFilters(DatasourceResourceFilter.class)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response markDatasourceUnused(
+      @PathParam("dataSourceName") final String dataSourceName,
+      final MarkDatasourceSegmentsPayload payload
+  )
+  {
+    if (payload == null || !payload.isValid()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity("Invalid request payload, either interval or segmentIds array must be specified")
+                     .build();
+    }
+
+    final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
+    if (dataSource == null) {
+      log.warn("datasource not found [%s]", dataSourceName);
+      return Response.noContent().build();
+    }
+
+    long markedSegmentCount = 0;
+    try {
+      final Interval interval = payload.getInterval();
+      final Set<String> segmentIds = payload.getSegmentIds();
+      if (interval != null) {
+        markedSegmentCount = databaseSegmentManager.disableSegments(dataSourceName, interval);
+      } else if (segmentIds != null) {
+        markedSegmentCount = databaseSegmentManager.disableSegments(dataSourceName, segmentIds);
+      }
+    }
+    catch (Exception e) {
+      return Response.serverError().entity(
+          ImmutableMap.of(
+              "error",
+              "Exception occurred.",
+              "message",
+              e.toString()
+          )
+      ).build();
+
+    }
+    if (markedSegmentCount == 0) {
+      return Response.noContent().build();
+    }
+    return Response.ok().build();
+  }
+
   static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
   {
     for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
@@ -700,4 +752,38 @@ public class DataSourcesResource
     }
     return false;
   }
+
+  @VisibleForTesting
+  protected static class MarkDatasourceSegmentsPayload
+  {
+    private final Interval interval;
+    private final Set<String> segmentIds;
+
+    @JsonCreator
+    public MarkDatasourceSegmentsPayload(
+        @JsonProperty("interval") Interval interval,
+        @JsonProperty("segmentIds") Set<String> segmentIds
+    )
+    {
+      this.interval = interval;
+      this.segmentIds = segmentIds;
+    }
+
+    @JsonProperty
+    public Interval getInterval()
+    {
+      return interval;
+    }
+
+    @JsonProperty
+    public Set<String> getSegmentIds()
+    {
+      return segmentIds;
+    }
+
+    public boolean isValid()
+    {
+      return (interval == null ^ segmentIds == null) && (segmentIds == null || !segmentIds.isEmpty());
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
index 7d1e178..d87df68 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -276,6 +277,302 @@ public class SQLMetadataSegmentManagerTest
   }
 
   @Test
+  public void testDisableSegmentsWithSegmentIds() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publisher.publishSegment(newSegment1);
+    publisher.publishSegment(newSegment2);
+    final ImmutableList<String> segmentIds = ImmutableList.of(newSegment1.getId().toString(), newSegment1.getId().toString());
+
+    Assert.assertEquals(segmentIds.size(), manager.disableSegments(datasource, segmentIds));
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
+  public void testDisableSegmentsWithSegmentIdsInvalidDatasource() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publisher.publishSegment(newSegment1);
+    publisher.publishSegment(newSegment2);
+    final ImmutableList<String> segmentIds = ImmutableList.of(
+        newSegment1.getId().toString(),
+        newSegment2.getId().toString()
+    );
+    // none of the segments are in datasource
+    Assert.assertEquals(0, manager.disableSegments("wrongDataSource", segmentIds));
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
+  public void testDisableSegmentsWithInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment3 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-19T00:00:00.000/2017-10-20T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publisher.publishSegment(newSegment1);
+    publisher.publishSegment(newSegment2);
+    publisher.publishSegment(newSegment3);
+    final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
+
+    // 2 out of 3 segments match the interval
+    Assert.assertEquals(2, manager.disableSegments(datasource, theInterval));
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment3),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDisableSegmentsWithInvalidInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publisher.publishSegment(newSegment1);
+    publisher.publishSegment(newSegment2);
+    // invalid interval start > end
+    final Interval theInterval = Intervals.of("2017-10-22T00:00:00.000/2017-10-02T00:00:00.000");
+    manager.disableSegments(datasource, theInterval);
+  }
+
+  @Test
+  public void testDisableSegmentsWithOverlappingInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment3 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-19T00:00:00.000/2017-10-22T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publisher.publishSegment(newSegment1);
+    publisher.publishSegment(newSegment2);
+    publisher.publishSegment(newSegment3);
+    final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
+
+    // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be disabled
+    Assert.assertEquals(1, manager.disableSegments(datasource, theInterval));
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment1, newSegment3),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
   public void testStopAndStart()
   {
     // Simulate successive losing and getting the coordinator leadership
diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 116c20b..16a2dc7 100644
--- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.client.SegmentLoadInfo;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataSegmentManager;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -801,6 +802,273 @@ public class DataSourcesResourceTest
     );
   }
 
+  @Test
+  public void testMarkDatasourceUnusedWithSegments()
+  {
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final Set<String> segmentIds = dataSegmentList.stream()
+                                                  .map(ds -> ds.getId().toString())
+                                                  .collect(Collectors.toSet());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds)).andReturn(1L).once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        null,
+        segmentIds
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(null, response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedWithSegmentsNoContent()
+  {
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final Set<String> segmentIds = dataSegmentList.stream()
+                                                  .map(ds -> ds.getId().toString())
+                                                  .collect(Collectors.toSet());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds)).andReturn(0L).once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        null,
+        segmentIds
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(204, response.getStatus());
+    Assert.assertEquals(null, response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedWithSegmentsException()
+  {
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final Set<String> segmentIds = dataSegmentList.stream()
+                                                  .map(ds -> ds.getId().toString())
+                                                  .collect(Collectors.toSet());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds))
+            .andThrow(new RuntimeException("Exception occurred"))
+            .once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        null,
+        segmentIds
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(500, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedWithInterval()
+  {
+    final Interval theInterval = Intervals.of("2010-01-01/P1D");
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval)).andReturn(1).once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        theInterval,
+        null
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(null, response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedWithIntervaNoContent()
+  {
+    final Interval theInterval = Intervals.of("2010-01-01/P1D");
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval)).andReturn(0).once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        theInterval,
+        null
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(204, response.getStatus());
+    Assert.assertEquals(null, response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedWithIntervaException()
+  {
+    final Interval theInterval = Intervals.of("2010-01-01/P1D");
+    final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
+    EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval))
+            .andThrow(new RuntimeException("Exception occurred"))
+            .once();
+    EasyMock.replay(segmentManager, inventoryView, server);
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        theInterval,
+        null
+    );
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        new AuthConfig(),
+        null
+    );
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(500, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    EasyMock.verify(segmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedNullPayload()
+  {
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = null;
+
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(
+        "Invalid request payload, either interval or segmentIds array must be specified",
+        response.getEntity()
+    );
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedInvalidPayload()
+  {
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+  }
+
+  @Test
+  public void testMarkDatasourceUnusedInvalidPayloadBothArguments()
+  {
+    final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
+        Intervals.of("2010-01-01/P1D"),
+        ImmutableSet.of()
+    );
+
+    Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+  }
+
   private DruidServerMetadata createRealtimeServerMetadata(String name)
   {
     return createServerMetadata(name, ServerType.REALTIME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org