You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/04/26 23:01:57 UTC

[incubator-druid] branch master updated: Add reload by interval API (#7490)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ebdf07b  Add reload by interval API (#7490)
ebdf07b is described below

commit ebdf07b69f5305c813ae4496d6efbace5fa6200a
Author: Adam Peck <da...@gmail.com>
AuthorDate: Fri Apr 26 17:01:50 2019 -0600

    Add reload by interval API (#7490)
    
    * Add reload by interval API
    Implements the reload proposal of #7439
    Added tests and updated docs
    
    * PR updates
    
    * Only build timeline with required segments
    Use 404 with message when a segmentId is not found
    Fix typo in doc
    Return number of segments modified.
    
    * Fix checkstyle errors
    
    * Replace String.format with StringUtils.format
    
    * Remove return value
    
    * Expand timeline to segments that overlap for intervals
    Restrict update call to only segments that need updating.
    
    * Only add overlapping enabled segments to the timeline
    
    * Some renames for clarity
    Added comments
    
    * Don't rely on cached poll data
    Only fetch required information from DB
    
    * Match error style
    
    * Merge and cleanup doc
    
    * Fix String.format call
    
    * Add unit tests
    
    * Fix unit tests that check for overshadowing
---
 docs/content/operations/api-reference.md           |  13 +-
 .../druid/metadata/MetadataSegmentManager.java     |  13 +
 .../druid/metadata/SQLMetadataSegmentManager.java  | 250 ++++++++----
 .../druid/metadata/UnknownSegmentIdException.java  |  31 ++
 .../druid/server/http/DataSourcesResource.java     |  57 ++-
 .../metadata/SQLMetadataSegmentManagerTest.java    | 447 +++++++++++++++++++++
 .../druid/server/http/DataSourcesResourceTest.java | 299 ++++++++++++++
 7 files changed, 1038 insertions(+), 72 deletions(-)

diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index 23b98c1..0f50316 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -235,10 +235,18 @@ Enables all segments of datasource which are not overshadowed by others.
 
 Enables a segment of a datasource.
 
+* `/druid/coordinator/v1/datasources/{dataSourceName}/markUsed`
+
 * `/druid/coordinator/v1/datasources/{dataSourceName}/markUnused`
 
-Marks segments unused for a datasource by interval or set of segment Ids. The request payload contains the interval or set of segment Ids to be marked unused.
-Either interval or segment ids should be provided, if both or none are provided in the payload , the API would throw an error (400 BAD REQUEST).Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
+Marks segments (un)used for a datasource by interval or set of segment Ids. 
+
+When marking used only segments that are not overshadowed will be updated.
+
+The request payload contains the interval or set of segment Ids to be marked unused.
+Either interval or segment ids should be provided, if both or none are provided in the payload, the API would throw an error (400 BAD REQUEST).
+
+Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
 
 JSON Request Payload:
 
@@ -247,7 +255,6 @@ JSON Request Payload:
 |`interval`|The interval for which to mark segments unused|"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"|
 |`segmentIds`|Set of segment Ids to be marked unused|["segmentId1", "segmentId2"]|
 
-
 ##### DELETE<a name="coordinator-delete"></a>
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}`
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index 23cd68a..4fc5177 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -37,10 +37,23 @@ public interface MetadataSegmentManager
 
   void stop();
 
+  /**
+   * Enables all segments for a dataSource which will not be overshadowed.
+   */
   boolean enableDataSource(String dataSource);
 
   boolean enableSegment(String segmentId);
 
+  /**
+   * Enables all segments contained in the interval which are not overshadowed by any currently enabled segments.
+   */
+  int enableSegments(String dataSource, Interval interval);
+
+  /**
+   * Enables the segments passed which are not overshadowed by any currently enabled segments.
+   */
+  int enableSegments(String dataSource, Collection<String> segmentIds);
+
   boolean removeDataSource(String dataSource);
 
   /**
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index 018d4a9..35843c9 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -22,7 +22,6 @@ package org.apache.druid.metadata;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import org.apache.druid.client.DruidDataSource;
@@ -30,7 +29,9 @@ import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -38,9 +39,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.timeline.partition.PartitionChunk;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.BaseResultSetMapper;
@@ -48,13 +47,11 @@ import org.skife.jdbi.v2.Batch;
 import org.skife.jdbi.v2.FoldController;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
-import org.skife.jdbi.v2.util.ByteArrayMapper;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -220,83 +217,200 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
     }
   }
 
-  @Override
-  public boolean enableDataSource(final String dataSource)
+  private Pair<DataSegment, Boolean> usedPayloadMapper(
+      final int index,
+      final ResultSet resultSet,
+      final StatementContext context
+  ) throws SQLException
   {
     try {
-      final IDBI dbi = connector.getDBI();
-      VersionedIntervalTimeline<String, DataSegment> segmentTimeline = connector.inReadOnlyTransaction(
-          (handle, status) -> VersionedIntervalTimeline.forSegments(
-              Iterators.transform(
-                  handle
-                      .createQuery(
-                          StringUtils.format(
-                              "SELECT payload FROM %s WHERE dataSource = :dataSource",
-                              getSegmentsTable()
-                          )
-                      )
-                      .setFetchSize(connector.getStreamingFetchSize())
-                      .bind("dataSource", dataSource)
-                      .map(ByteArrayMapper.FIRST)
-                      .iterator(),
-                  payload -> {
-                    try {
-                      return jsonMapper.readValue(payload, DataSegment.class);
-                    }
-                    catch (IOException e) {
-                      throw new RuntimeException(e);
-                    }
-                  }
-              )
-
-          )
-      );
-
-      final List<DataSegment> segments = new ArrayList<>();
-      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders = segmentTimeline.lookup(
-          Intervals.of("0000-01-01/3000-01-01")
+      return new Pair<>(
+          jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class),
+          resultSet.getBoolean("used")
       );
-      for (TimelineObjectHolder<String, DataSegment> objectHolder : timelineObjectHolders) {
-        for (PartitionChunk<DataSegment> partitionChunk : objectHolder.getObject()) {
-          segments.add(partitionChunk.getObject());
-        }
-      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-      if (segments.isEmpty()) {
-        log.warn("No segments found in the database!");
-        return false;
-      }
+  /**
+   * Gets a list of all datasegments that overlap the provided interval along with thier used status.
+   */
+  private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
+      final String dataSource,
+      final Interval interval
+  )
+  {
+    return connector.inReadOnlyTransaction(
+        (handle, status) -> handle.createQuery(
+            StringUtils.format(
+                "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start",
+                getSegmentsTable(),
+                connector.getQuoteString()
+            )
+        )
+        .setFetchSize(connector.getStreamingFetchSize())
+        .bind("dataSource", dataSource)
+        .bind("start", interval.getStart().toString())
+        .bind("end", interval.getEnd().toString())
+        .map(this::usedPayloadMapper)
+        .list()
+    );
+  }
 
-      dbi.withHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle)
-            {
-              Batch batch = handle.createBatch();
+  private List<Pair<DataSegment, Boolean>> getDataSegments(
+      final String dataSource,
+      final Collection<String> segmentIds,
+      final Handle handle
+  )
+  {
+    return segmentIds.stream().map(
+        segmentId -> Optional.ofNullable(
+            handle.createQuery(
+                StringUtils.format(
+                    "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id",
+                    getSegmentsTable()
+                )
+            )
+            .bind("dataSource", dataSource)
+            .bind("id", segmentId)
+            .map(this::usedPayloadMapper)
+            .first()
+        )
+        .orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", segmentId)))
+    )
+    .collect(Collectors.toList());
+  }
 
-              for (DataSegment segment : segments) {
-                batch.add(
-                    StringUtils.format(
-                        "UPDATE %s SET used=true WHERE id = '%s'",
-                        getSegmentsTable(),
-                        segment.getId()
-                    )
-                );
+  /**
+   * Builds a VersionedIntervalTimeline containing used segments that overlap the intervals passed.
+   */
+  private VersionedIntervalTimeline<String, DataSegment> buildVersionedIntervalTimeline(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      final Handle handle
+  )
+  {
+    return VersionedIntervalTimeline.forSegments(intervals
+        .stream()
+        .flatMap(interval -> handle.createQuery(
+                StringUtils.format(
+                    "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true",
+                    getSegmentsTable(),
+                    connector.getQuoteString()
+                )
+            )
+            .setFetchSize(connector.getStreamingFetchSize())
+            .bind("dataSource", dataSource)
+            .bind("start", interval.getStart().toString())
+            .bind("end", interval.getEnd().toString())
+            .map((i, resultSet, context) -> {
+              try {
+                return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+              }
+              catch (IOException e) {
+                throw new RuntimeException(e);
               }
-              batch.execute();
+            })
+            .list()
+            .stream()
+        )
+        .iterator()
+    );
+  }
 
-              return null;
-            }
-          }
-      );
+  @Override
+  public boolean enableDataSource(final String dataSource)
+  {
+    try {
+      return enableSegments(dataSource, Intervals.ETERNITY) != 0;
     }
     catch (Exception e) {
       log.error(e, "Exception enabling datasource %s", dataSource);
       return false;
     }
+  }
 
-    return true;
+  @Override
+  public int enableSegments(final String dataSource, final Interval interval)
+  {
+    List<Pair<DataSegment, Boolean>> segments = getDataSegmentsOverlappingInterval(dataSource, interval);
+    List<DataSegment> segmentsToEnable = segments.stream()
+        .filter(segment -> !segment.rhs && interval.contains(segment.lhs.getInterval()))
+        .map(segment -> segment.lhs)
+        .collect(Collectors.toList());
+
+    VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(
+        segments.stream().filter(segment -> segment.rhs).map(segment -> segment.lhs).iterator()
+    );
+    VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segmentsToEnable.iterator());
+
+    return enableSegments(
+        segmentsToEnable,
+        versionedIntervalTimeline
+    );
+  }
+
+  @Override
+  public int enableSegments(final String dataSource, final Collection<String> segmentIds)
+  {
+    Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> data = connector.inReadOnlyTransaction(
+        (handle, status) -> {
+          List<DataSegment> segments = getDataSegments(dataSource, segmentIds, handle)
+              .stream()
+              .filter(pair -> !pair.rhs)
+              .map(pair -> pair.lhs)
+              .collect(Collectors.toList());
+
+          VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = buildVersionedIntervalTimeline(
+              dataSource,
+              JodaUtils.condenseIntervals(segments.stream().map(segment -> segment.getInterval()).collect(Collectors.toList())),
+              handle
+          );
+          VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segments.iterator());
+
+          return new Pair<>(
+              segments,
+              versionedIntervalTimeline
+          );
+        }
+    );
+
+    return enableSegments(
+        data.lhs,
+        data.rhs
+    );
+  }
+
+  private int enableSegments(
+      final Collection<DataSegment> segments,
+      final VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline
+  )
+  {
+    if (segments.isEmpty()) {
+      log.warn("No segments found to update!");
+      return 0;
+    }
+
+    return connector.getDBI().withHandle(handle -> {
+      Batch batch = handle.createBatch();
+      segments
+          .stream()
+          .map(segment -> segment.getId())
+          .filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(
+              segmentId.getInterval(),
+              segmentId.getVersion()
+          ))
+          .forEach(segmentId -> batch.add(
+              StringUtils.format(
+                  "UPDATE %s SET used=true WHERE id = '%s'",
+                  getSegmentsTable(),
+                  segmentId
+              )
+          ));
+      return batch.execute().length;
+    });
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java b/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java
new file mode 100644
index 0000000..cca37b9
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+/**
+ * Exception thrown by MetadataSegmentManager when an segment id is unknown.
+ */
+public class UnknownSegmentIdException extends RuntimeException
+{
+  public UnknownSegmentIdException(String message)
+  {
+    super(message);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 00ba289..5904d52 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -21,7 +21,7 @@ package org.apache.druid.server.http;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.MetadataSegmentManager;
+import org.apache.druid.metadata.UnknownSegmentIdException;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -753,6 +754,60 @@ public class DataSourcesResource
     return false;
   }
 
+  @POST
+  @Path("/{dataSourceName}/markUsed")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response enableDatasourceSegments(
+      @PathParam("dataSourceName") String dataSourceName,
+      MarkDatasourceSegmentsPayload payload
+  )
+  {
+    if (payload == null || !payload.isValid()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity("Invalid request payload, either interval or segmentIds array must be specified")
+                     .build();
+    }
+
+    final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
+    if (dataSource == null) {
+      return Response.noContent().build();
+    }
+
+    int modified;
+    try {
+      if (payload.getInterval() != null) {
+        modified = databaseSegmentManager.enableSegments(dataSource.getName(), payload.getInterval());
+      } else {
+        modified = databaseSegmentManager.enableSegments(dataSource.getName(), payload.getSegmentIds());
+      }
+    }
+    catch (Exception e) {
+      if (e.getCause() instanceof UnknownSegmentIdException) {
+        return Response.status(Response.Status.NOT_FOUND).entity(
+            ImmutableMap.of(
+                "message",
+                e.getCause().getMessage()
+            )
+        ).build();
+      }
+      return Response.serverError().entity(
+          ImmutableMap.of(
+              "error",
+              "Exception occurred.",
+              "message",
+              e.getMessage()
+          )
+      ).build();
+    }
+
+    if (modified == 0) {
+      return Response.noContent().build();
+    }
+
+    return Response.ok().build();
+  }
+
   @VisibleForTesting
   protected static class MarkDatasourceSegmentsPayload
   {
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
index d87df68..e796448 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -32,12 +33,14 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.hamcrest.core.IsInstanceOf;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.stream.Collectors;
@@ -48,6 +51,9 @@ public class SQLMetadataSegmentManagerTest
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   private SQLMetadataSegmentManager manager;
   private SQLMetadataSegmentPublisher publisher;
   private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
@@ -84,6 +90,21 @@ public class SQLMetadataSegmentManagerTest
       1234L
   );
 
+  private void publish(DataSegment segment, boolean used) throws IOException
+  {
+    publisher.publishSegment(
+        segment.getId().toString(),
+        segment.getDataSource(),
+        DateTimes.nowUtc().toString(),
+        segment.getInterval().getStart().toString(),
+        segment.getInterval().getEnd().toString(),
+        (segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
+        segment.getVersion(),
+        used,
+        jsonMapper.writeValueAsBytes(segment)
+    );
+  }
+
   @Before
   public void setUp() throws Exception
   {
@@ -277,6 +298,432 @@ public class SQLMetadataSegmentManagerTest
   }
 
   @Test
+  public void testEnableSegmentsWithSegmentIds() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-16T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        1,
+        1234L
+    );
+
+    // Overshadowed by newSegment2
+    final DataSegment newSegment3 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        1,
+        1234L
+    );
+
+    publish(newSegment1, false);
+    publish(newSegment2, false);
+    publish(newSegment3, false);
+    final ImmutableList<String> segmentIds = ImmutableList.of(
+        newSegment1.getId().toString(),
+        newSegment2.getId().toString(),
+        newSegment3.getId().toString()
+    );
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+    Assert.assertEquals(2, manager.enableSegments(datasource, segmentIds));
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
+  public void testEnableSegmentsWithSegmentIdsInvalidDatasource() throws IOException
+  {
+    thrown.expectCause(IsInstanceOf.instanceOf(UnknownSegmentIdException.class));
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publish(newSegment1, false);
+    publish(newSegment2, false);
+    final ImmutableList<String> segmentIds = ImmutableList.of(
+        newSegment1.getId().toString(),
+        newSegment2.getId().toString()
+    );
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+    // none of the segments are in datasource
+    Assert.assertEquals(0, manager.enableSegments("wrongDataSource", segmentIds));
+  }
+
+  @Test
+  public void testEnableSegmentsWithInvalidSegmentIds()
+  {
+    thrown.expectCause(IsInstanceOf.instanceOf(UnknownSegmentIdException.class));
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final ImmutableList<String> segmentIds = ImmutableList.of(
+        newSegment1.getId().toString(),
+        newSegment2.getId().toString()
+    );
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+    // none of the segments are in datasource
+    Assert.assertEquals(0, manager.enableSegments(datasource, segmentIds));
+  }
+
+  @Test
+  public void testEnableSegmentsWithInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-16T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        1,
+        1234L
+    );
+
+    final DataSegment newSegment3 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-19T00:00:00.000/2017-10-20T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    // Overshadowed by newSegment2
+    final DataSegment newSegment4 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publish(newSegment1, false);
+    publish(newSegment2, false);
+    publish(newSegment3, false);
+    publish(newSegment4, false);
+    final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+
+    // 2 out of 3 segments match the interval
+    Assert.assertEquals(2, manager.enableSegments(datasource, theInterval));
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEnableSegmentsWithInvalidInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publish(newSegment1, false);
+    publish(newSegment2, false);
+    // invalid interval start > end
+    final Interval theInterval = Intervals.of("2017-10-22T00:00:00.000/2017-10-02T00:00:00.000");
+    manager.enableSegments(datasource, theInterval);
+  }
+
+  @Test
+  public void testEnableSegmentsWithOverlappingInterval() throws IOException
+  {
+    manager.start();
+    manager.poll();
+    Assert.assertTrue(manager.isStarted());
+
+    final String datasource = "wikipedia2";
+    final DataSegment newSegment1 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    final DataSegment newSegment2 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-16T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        1,
+        1234L
+    );
+
+    final DataSegment newSegment3 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-19T00:00:00.000/2017-10-22T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    // Overshadowed by newSegment2
+    final DataSegment newSegment4 = new DataSegment(
+        datasource,
+        Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
+        "2017-10-15T20:19:12.565Z",
+        ImmutableMap.of(
+            "type", "s3_zip",
+            "bucket", "test",
+            "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
+        ),
+        ImmutableList.of("dim1", "dim2", "dim3"),
+        ImmutableList.of("count", "value"),
+        NoneShardSpec.instance(),
+        0,
+        1234L
+    );
+
+    publish(newSegment1, false);
+    publish(newSegment2, false);
+    publish(newSegment3, false);
+    publish(newSegment4, false);
+    final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+
+    // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be disabled
+    Assert.assertEquals(1, manager.enableSegments(datasource, theInterval));
+
+    manager.poll();
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2, newSegment2),
+        ImmutableSet.copyOf(manager.iterateAllSegments())
+    );
+  }
+
+  @Test
   public void testDisableSegmentsWithSegmentIds() throws IOException
   {
     manager.start();
diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 16a2dc7..03b2146 100644
--- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -667,6 +667,305 @@ public class DataSourcesResourceTest
   }
 
   @Test
+  public void testEnableDatasourceSegment()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    EasyMock.expect(metadataSegmentManager.enableSegment(dataSegmentList.get(0).getId().toString()))
+        .andReturn(true)
+        .once();
+    EasyMock.replay(metadataSegmentManager);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        null,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegment(dataSegmentList.get(0).getDataSource(), dataSegmentList.get(0).getId().toString());
+    Assert.assertEquals(200, response.getStatus());
+    EasyMock.verify(metadataSegmentManager);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentFailed()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    EasyMock.expect(metadataSegmentManager.enableSegment(dataSegmentList.get(0).getId().toString()))
+            .andReturn(false)
+            .once();
+    EasyMock.replay(metadataSegmentManager);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        null,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegment(dataSegmentList.get(0).getDataSource(), dataSegmentList.get(0).getId().toString());
+    Assert.assertEquals(204, response.getStatus());
+    EasyMock.verify(metadataSegmentManager);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsInterval()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
+    Interval interval = Intervals.of("2010-01-22/P1D");
+    EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
+            .andReturn(3)
+            .once();
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
+    EasyMock.replay(metadataSegmentManager, inventoryView, server);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            interval,
+            null
+        )
+    );
+    Assert.assertEquals(200, response.getStatus());
+    EasyMock.verify(metadataSegmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsIntervalNoneUpdated()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
+    Interval interval = Intervals.of("2010-01-22/P1D");
+    EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
+            .andReturn(0)
+            .once();
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
+    EasyMock.replay(metadataSegmentManager, inventoryView, server);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            interval,
+            null
+        )
+    );
+    Assert.assertEquals(204, response.getStatus());
+    EasyMock.verify(metadataSegmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsSet()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
+    Set<String> segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString());
+    EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)))
+            .andReturn(3)
+            .once();
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
+    EasyMock.replay(metadataSegmentManager, inventoryView, server);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            null,
+            segmentIds
+        )
+    );
+    Assert.assertEquals(200, response.getStatus());
+    EasyMock.verify(metadataSegmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsIntervalException()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
+    Interval interval = Intervals.of("2010-01-22/P1D");
+    EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
+            .andThrow(new RuntimeException("Error!"))
+            .once();
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
+    EasyMock.replay(metadataSegmentManager, inventoryView, server);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            interval,
+            null
+        )
+    );
+    Assert.assertEquals(500, response.getStatus());
+    EasyMock.verify(metadataSegmentManager, inventoryView, server);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentslNoDatasource()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+    EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
+    EasyMock.expect(server.getDataSource("datasource1")).andReturn(null).once();
+    EasyMock.replay(metadataSegmentManager, inventoryView, server);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+          Intervals.of("2010-01-22/P1D"),
+          null
+        )
+    );
+    Assert.assertEquals(204, response.getStatus());
+    EasyMock.verify(metadataSegmentManager);
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsInvalidPayloadNoArguments()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            null,
+            null
+        )
+    );
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsInvalidPayloadBothArguments()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            Intervals.of("2010-01-22/P1D"),
+            ImmutableSet.of()
+        )
+    );
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsInvalidPayloadEmptyArray()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        new DataSourcesResource.MarkDatasourceSegmentsPayload(
+            null,
+            ImmutableSet.of()
+        )
+    );
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testEnableDatasourceSegmentsNoPayload()
+  {
+    MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
+
+    DataSourcesResource DataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        metadataSegmentManager,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Response response = DataSourcesResource.enableDatasourceSegments(
+        "datasource1",
+        null
+    );
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
   public void testSegmentLoadChecksForVersion()
   {
     Interval interval = Intervals.of(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org