You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2023/04/06 17:13:16 UTC

[druid] branch master updated: Allow for Input source security in native task layer (#14003)

This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c0221375c  Allow for Input source security in native task layer (#14003)
5c0221375c is described below

commit 5c0221375c8dfa4c3509eb9e1b6864941dfe84c7
Author: zachjsh <za...@gmail.com>
AuthorDate: Thu Apr 6 13:13:09 2023 -0400

     Allow for Input source security in native task layer (#14003)
    
    Fixes #13837.
    
    ### Description
    
    This change allows for input source type security in the native task layer.
    
    To enable this feature, the user must set the following property to true:
    
    `druid.auth.enableInputSourceSecurity=true`
    
    The default value for this property is false, which will continue the existing functionality of needing authorization to write to the respective datasource.
    
    When this config is enabled, the users will be required to be authorized for the following resource action, in addition to write permission on the respective datasource.
    
    `new ResourceAction(new Resource(ResourceType.EXTERNAL, {INPUT_SOURCE_TYPE}, Action.READ`
    
    where `{INPUT_SOURCE_TYPE}` is the type of the input source being used;, http, inline, s3, etc..
    
    Only tasks that provide a non-default implementation of the `getInputSourceResources` method can be submitted when config `druid.auth.enableInputSourceSecurity=true` is set. Otherwise, a 400 error will be thrown.
---
 .idea/misc.xml                                     |  16 +-
 .../druid/data/input/azure/AzureInputSource.java   |  12 ++
 .../data/input/azure/AzureInputSourceTest.java     |  18 ++
 .../google/GoogleCloudStorageInputSource.java      |  13 ++
 .../google/GoogleCloudStorageInputSourceTest.java  |   8 +
 .../druid/inputsource/hdfs/HdfsInputSource.java    |  13 ++
 .../druid/storage/hdfs/HdfsStorageDruidModule.java |   2 +-
 .../inputsource/hdfs/HdfsInputSourceTest.java      |  14 ++
 .../druid/indexing/kafka/KafkaIndexTask.java       |  20 ++
 .../kafka/supervisor/KafkaSupervisorSpec.java      |  21 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  43 ++++
 .../kafka/supervisor/KafkaSupervisorTest.java      |  11 +
 .../druid/indexing/kinesis/KinesisIndexTask.java   |  20 ++
 .../kinesis/supervisor/KinesisSupervisorSpec.java  |  21 +-
 .../kinesis/KinesisIndexTaskSerdeTest.java         |  12 ++
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  57 ++++++
 .../apache/druid/data/input/s3/S3InputSource.java  |  12 ++
 .../druid/data/input/s3/S3InputSourceTest.java     |  19 ++
 .../druid/indexing/common/task/CompactionTask.java |  10 +
 .../indexing/common/task/HadoopIndexTask.java      |  14 ++
 .../druid/indexing/common/task/IndexTask.java      |  22 ++
 .../druid/indexing/common/task/NoopTask.java       |  12 ++
 .../apache/druid/indexing/common/task/Task.java    |  33 +++
 .../batch/parallel/LegacySinglePhaseSubTask.java   |  25 +++
 .../parallel/ParallelIndexSupervisorTask.java      |  21 ++
 .../parallel/PartialDimensionCardinalityTask.java  |  25 +++
 .../parallel/PartialDimensionDistributionTask.java |  25 +++
 .../parallel/PartialHashSegmentGenerateTask.java   |  23 +++
 .../parallel/PartialRangeSegmentGenerateTask.java  |  24 +++
 .../task/batch/parallel/SinglePhaseSubTask.java    |  23 +++
 .../indexing/overlord/http/OverlordResource.java   |  45 +++-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |  15 ++
 .../common/task/CompactionTaskParallelRunTest.java |  14 ++
 .../indexing/common/task/CompactionTaskTest.java   |  19 ++
 .../indexing/common/task/HadoopIndexTaskTest.java  |  81 ++++++++
 .../druid/indexing/common/task/IndexTaskTest.java  |  52 +++++
 .../druid/indexing/common/task/NoopTaskTest.java   |  33 +++
 .../common/task/RealtimeIndexTaskTest.java         |  11 +
 .../druid/indexing/common/task/TaskTest.java       | 131 ++++++++++++
 .../PartialDimensionCardinalityTaskTest.java       |  19 ++
 .../PartialDimensionDistributionTaskTest.java      |  29 ++-
 .../PartialHashSegmentGenerateTaskTest.java        |  18 ++
 .../PartialRangeSegmentGenerateTaskTest.java       |  18 ++
 .../parallel/SinglePhaseParallelIndexingTest.java  |  12 ++
 .../batch/parallel/SinglePhaseSubTaskSpecTest.java |  13 ++
 .../overlord/http/OverlordResourceTest.java        | 227 +++++++++++++++++----
 .../druid/indexing/overlord/http/OverlordTest.java |   3 +-
 .../org/apache/druid/data/input/InputSource.java   |  15 ++
 .../data/input/impl/CombiningInputSource.java      |  17 ++
 .../druid/data/input/impl/HttpInputSource.java     |  11 +
 .../druid/data/input/impl/InlineInputSource.java   |  11 +
 .../druid/data/input/impl/LocalInputSource.java    |  11 +
 .../data/input/impl/CombiningInputSourceTest.java  |  36 ++++
 .../data/input/impl/InlineInputSourceTest.java     |  35 ++++
 .../druid/data/input/impl/InputSourceTest.java     |  65 ++++++
 .../data/input/impl/LocalInputSourceTest.java      |   8 +
 .../overlord/supervisor/NoopSupervisorSpec.java    |  13 ++
 .../overlord/supervisor/SupervisorSpec.java        |  22 ++
 .../druid/metadata/input/InputSourceModule.java    |   2 +-
 .../druid/metadata/input/SqlInputSource.java       |  12 ++
 .../druid/indexing/NoopSupervisorSpecTest.java     |   7 +
 .../overlord/supervisor/SupervisorSpecTest.java    |  68 ++++++
 .../druid/metadata/input/SqlInputSourceTest.java   |  12 ++
 .../planner/SqlResourceCollectorShuttle.java       |   4 +-
 64 files changed, 1610 insertions(+), 68 deletions(-)

diff --git a/.idea/misc.xml b/.idea/misc.xml
index bf2061d739..e66748b782 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -31,6 +31,7 @@
       <writeAnnotation name="org.powermock.api.easymock.annotation.Mock" />
     </writeAnnotations>
   </component>
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
   <component name="JavaScriptSettings">
     <option name="languageLevel" value="ES6" />
   </component>
@@ -46,7 +47,7 @@
     <option name="myDefaultNotNull" value="javax.annotation.Nonnull" />
     <option name="myNullables">
       <value>
-        <list size="12">
+        <list size="15">
           <item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.Nullable" />
           <item index="1" class="java.lang.String" itemvalue="javax.annotation.Nullable" />
           <item index="2" class="java.lang.String" itemvalue="javax.annotation.CheckForNull" />
@@ -59,12 +60,15 @@
           <item index="9" class="java.lang.String" itemvalue="org.checkerframework.checker.nullness.compatqual.NullableType" />
           <item index="10" class="java.lang.String" itemvalue="androidx.annotation.RecentlyNullable" />
           <item index="11" class="java.lang.String" itemvalue="com.android.annotations.Nullable" />
+          <item index="12" class="java.lang.String" itemvalue="org.jspecify.nullness.Nullable" />
+          <item index="13" class="java.lang.String" itemvalue="org.eclipse.jdt.annotation.Nullable" />
+          <item index="14" class="java.lang.String" itemvalue="jakarta.annotation.Nullable" />
         </list>
       </value>
     </option>
     <option name="myNotNulls">
       <value>
-        <list size="11">
+        <list size="15">
           <item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.NotNull" />
           <item index="1" class="java.lang.String" itemvalue="javax.annotation.Nonnull" />
           <item index="2" class="java.lang.String" itemvalue="edu.umd.cs.findbugs.annotations.NonNull" />
@@ -76,6 +80,10 @@
           <item index="8" class="java.lang.String" itemvalue="org.checkerframework.checker.nullness.compatqual.NonNullType" />
           <item index="9" class="java.lang.String" itemvalue="androidx.annotation.RecentlyNonNull" />
           <item index="10" class="java.lang.String" itemvalue="com.android.annotations.NonNull" />
+          <item index="11" class="java.lang.String" itemvalue="jakarta.annotation.Nonnull" />
+          <item index="12" class="java.lang.String" itemvalue="lombok.NonNull" />
+          <item index="13" class="java.lang.String" itemvalue="org.jspecify.nullness.NonNull" />
+          <item index="14" class="java.lang.String" itemvalue="org.eclipse.jdt.annotation.NonNull" />
         </list>
       </value>
     </option>
@@ -84,7 +92,7 @@
     <resource url="http://maven.apache.org/ASSEMBLY/2.0.0" location="$PROJECT_DIR$/.idea/xml-schemas/assembly-2.0.0.xsd" />
     <resource url="http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd" location="$PROJECT_DIR$/.idea/xml-schemas/svg11.dtd" />
   </component>
-  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
     <output url="file://$PROJECT_DIR$/classes" />
   </component>
-</project>
+</project>
\ No newline at end of file
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index bd1db5c1f0..f68ddfa901 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.azure;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
@@ -35,12 +36,15 @@ import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
 import org.apache.druid.storage.azure.AzureInputDataConfig;
 import org.apache.druid.storage.azure.AzureStorage;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Abstracts the Azure storage system where input data is stored. Allows users to retrieve entities in
@@ -77,6 +81,14 @@ public class AzureInputSource extends CloudObjectInputSource
     this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig");
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(SCHEME);
+  }
+
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index a8fc402eb8..eefa5bfcb9 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.azure;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.data.input.InputSplit;
@@ -265,6 +266,23 @@ public class AzureInputSourceTest extends EasyMockSupport
     Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", actualToString);
   }
 
+  @Test
+  public void test_getTypes_returnsExpectedTypes()
+  {
+    List<URI> prefixes = ImmutableList.of(PREFIX_URI);
+    azureInputSource = new AzureInputSource(
+        storage,
+        entityFactory,
+        azureCloudBlobIterableFactory,
+        inputDataConfig,
+        EMPTY_URIS,
+        prefixes,
+        EMPTY_OBJECTS,
+        null
+    );
+    Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME), azureInputSource.getTypes());
+  }
+
   @Test
   public void abidesEqualsContract()
   {
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
index 55f6d173ec..3e2e50d6bc 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.google;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.common.collect.Iterators;
@@ -36,15 +37,19 @@ import org.apache.druid.storage.google.GoogleStorage;
 import org.apache.druid.storage.google.GoogleStorageDruidModule;
 import org.apache.druid.storage.google.GoogleUtils;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 public class GoogleCloudStorageInputSource extends CloudObjectInputSource
 {
+  static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
   private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
 
   private final GoogleStorage storage;
@@ -65,6 +70,14 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
     this.inputDataConfig = inputDataConfig;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   @Override
   protected InputEntity createEntity(CloudObjectLocation location)
   {
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
index 41308e54c2..404fd45d4d 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
@@ -153,6 +153,14 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
     Assert.assertEquals(withObjects, serdeWithObjects);
   }
 
+  @Test
+  public void testGetTypes()
+  {
+    final GoogleCloudStorageInputSource inputSource =
+        new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
+    Assert.assertEquals(Collections.singleton(GoogleCloudStorageInputSource.TYPE_KEY), inputSource.getTypes());
+  }
+
   @Test
   public void testWithUrisSplit() throws Exception
   {
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index 7faebccf26..9e76a69ec3 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.inputsource.hdfs;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.Hdfs;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
 import org.apache.druid.utils.Streams;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
@@ -56,11 +59,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource<List<Path>>
 {
+  static final String TYPE_KEY = HdfsStorageDruidModule.SCHEME;
   private static final String PROP_PATHS = "paths";
 
   private final List<String> inputPaths;
@@ -91,6 +96,14 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
     this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p));
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
   {
     if (inputPaths instanceof String) {
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 68c7650960..a1443ea50b 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -50,7 +50,7 @@ import java.util.Properties;
  */
 public class HdfsStorageDruidModule implements DruidModule
 {
-  static final String SCHEME = "hdfs";
+  public static final String SCHEME = "hdfs";
   private Properties props = null;
 
   @Inject
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index e1f112ad00..879e49f0c7 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -163,6 +163,20 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
                      .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
                      .build();
     }
+
+    @Test
+    public void testGetTypes()
+    {
+      final Configuration conf = new Configuration();
+      conf.set("fs.default.name", "hdfs://localhost:7020");
+      HdfsInputSource inputSource = HdfsInputSource.builder()
+                     .paths("/foo/bar*")
+                     .configuration(conf)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+
+      Assert.assertEquals(Collections.singleton(HdfsInputSource.TYPE_KEY), inputSource.getTypes());
+    }
   }
 
   public static class SerializeDeserializeTest
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 700d2d12d9..8ba36124c2 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -30,12 +31,20 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 
+import javax.annotation.Nonnull;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
 {
+  public static final String INPUT_SOURCE_TYPE = "kafka";
   private static final String TYPE = "index_kafka";
 
   private final ObjectMapper configMapper;
@@ -132,6 +141,17 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return Collections.singleton(new ResourceAction(
+        new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE),
+        Action.READ
+    ));
+  }
+
   @Override
   public boolean supportsQueries()
   {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 55f69e1cda..f2bee7f431 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.guice.annotations.Json;
@@ -35,13 +36,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
 {
-  private static final String TASK_TYPE = "kafka";
+  static final String TASK_TYPE = "kafka";
 
   @JsonCreator
   public KafkaSupervisorSpec(
@@ -92,6 +100,17 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
     return TASK_TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceTypes()
+  {
+    return Collections.singleton(new ResourceAction(
+        new Resource(ResourceType.EXTERNAL, TASK_TYPE),
+        Action.READ
+    ));
+  }
+
   @Override
   public String getSource()
   {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 0ba17cdc38..42f2154f20 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -111,6 +111,10 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -2672,6 +2676,45 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
     Assert.assertEquals(task, task1);
   }
 
+  @Test
+  public void testCorrectInputSources() throws Exception
+  {
+    // This is both a serde test and a regression test for https://github.com/apache/druid/issues/7724.
+
+    final KafkaIndexTask task = createTask(
+        "taskid",
+        NEW_DATA_SCHEMA.withTransformSpec(
+            new TransformSpec(
+                null,
+                ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
+            )
+        ),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence",
+            new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()),
+            new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()),
+            ImmutableMap.of(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            INPUT_FORMAT,
+            null
+        )
+    );
+
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                KafkaIndexTask.INPUT_SOURCE_TYPE
+            ), Action.READ)),
+        task.getInputSourceResources()
+    );
+  }
+
+
   /**
    * Wait for a task to consume certain offsets (inclusive).
    */
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 28b915d22c..09a8a7a645 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -433,6 +437,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
             Long.MAX_VALUE,
             (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2)
     );
+    Assert.assertEquals(
+        Collections.singleton(new ResourceAction(
+            new Resource(ResourceType.EXTERNAL, KafkaSupervisorSpec.TASK_TYPE),
+            Action.READ
+        )),
+        testableSupervisorSpec.getInputSourceTypes()
+    );
 
     autoscaler.reset();
     autoscaler.stop();
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index d8fbd6a36b..dc3d64af36 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -34,12 +35,20 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.utils.RuntimeInfo;
 
+import javax.annotation.Nonnull;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
 {
+  public static final String INPUT_SOURCE_TYPE = "kinesis";
   private static final String TYPE = "index_kinesis";
   private static final Logger log = new Logger(KinesisIndexTask.class);
 
@@ -147,6 +156,17 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return Collections.singleton(new ResourceAction(
+        new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE),
+        Action.READ
+    ));
+  }
+
   @Override
   public boolean supportsQueries()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index f210ca6976..026e4ac4fb 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis.supervisor;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.name.Named;
@@ -38,13 +39,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
 {
-  private static final String SUPERVISOR_TYPE = "kinesis";
+  static final String SUPERVISOR_TYPE = "kinesis";
   private final AWSCredentialsConfig awsCredentialsConfig;
 
   @JsonCreator
@@ -114,6 +122,17 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
     return SUPERVISOR_TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceTypes()
+  {
+    return Collections.singleton(new ResourceAction(
+        new Resource(ResourceType.EXTERNAL, SUPERVISOR_TYPE),
+        Action.READ
+    ));
+  }
+
   @Override
   public String getSource()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 9d9b3dd417..097a7784a8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -36,6 +36,10 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -127,6 +131,14 @@ public class KinesisIndexTaskSerdeTest
     Assert.assertEquals(ACCESS_KEY, awsCredentialsConfig.getAccessKey().getPassword());
     Assert.assertEquals(SECRET_KEY, awsCredentialsConfig.getSecretKey().getPassword());
     Assert.assertEquals(FILE_SESSION_CREDENTIALS, awsCredentialsConfig.getFileSessionCredentials());
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                KinesisIndexTask.INPUT_SOURCE_TYPE
+            ), Action.READ)),
+        target.getInputSourceResources()
+    );
   }
 
   private static ObjectMapper createObjectMapper()
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 265a9fc144..079e5314e8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -88,6 +88,10 @@ import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
@@ -4080,6 +4084,59 @@ public class KinesisSupervisorTest extends EasyMockSupport
     testShardSplitPhaseThree(phaseTwoTasks);
   }
 
+  @Test
+  public void testCorrectInputSources()
+  {
+    KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+        null,
+        dataSchema,
+        null,
+        new KinesisSupervisorIOConfig(
+            STREAM,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(
+                new Resource(ResourceType.EXTERNAL, KinesisSupervisorSpec.SUPERVISOR_TYPE),
+                Action.READ
+            )),
+        supervisorSpec.getInputSourceTypes()
+    );
+  }
+
   private List<Task> testShardSplitPhaseOne() throws Exception
   {
     supervisorRecordSupplier.assign(EasyMock.anyObject());
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 7a52b672f7..106d33216f 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -30,6 +30,7 @@ import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
 import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
@@ -56,13 +57,16 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 
 public class S3InputSource extends CloudObjectInputSource
 {
+  public static final String TYPE_KEY = S3StorageDruidModule.SCHEME;
   // We lazily initialize ServerSideEncryptingAmazonS3 to avoid costly s3 operation when we only need S3InputSource
   // for stored information (such as for task logs) and not for ingestion.
   // (This cost only applies for new ServerSideEncryptingAmazonS3 created with s3InputSourceConfig given).
@@ -236,6 +240,14 @@ public class S3InputSource extends CloudObjectInputSource
     this.maxRetries = maxRetries;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   private void applyAssumeRole(
       ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
       S3InputSourceConfig s3InputSourceConfig,
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index c742af8e5f..fc538b682f 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -336,6 +336,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
   }
 
+  @Test
+  public void testGetTypes()
+  {
+    final S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        EXPECTED_URIS,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(Collections.singleton(S3InputSource.TYPE_KEY), inputSource.getTypes());
+  }
+
   @Test
   public void testS3InputSourceUseEndPointClientProxy()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 8387d137f1..0428b8e42f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.curator.shaded.com.google.common.base.Verify;
@@ -90,6 +91,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -395,6 +397,14 @@ public class CompactionTask extends AbstractBatchIndexTask
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return ImmutableSet.of();
+  }
+
   @Override
   public int getPriority()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index e2b5bd9917..d81167fddf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -67,11 +67,15 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.util.ToolRunner;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -85,9 +89,11 @@ import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class HadoopIndexTask extends HadoopTask implements ChatHandler
@@ -193,6 +199,14 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     return "index_hadoop";
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return Collections.singleton(new ResourceAction(new Resource(ResourceType.EXTERNAL, "hadoop"), Action.READ));
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 31acf6e6b6..b4d4708082 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +30,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -97,6 +99,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -130,6 +135,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 {
@@ -288,6 +294,22 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (ingestionSchema.getIOConfig().firehoseFactory != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+               .stream()
+               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @GET
   @Path("/unparseableEvents")
   @Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index bce3eb4183..e4c112100e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.indexer.TaskStatus;
@@ -33,9 +34,12 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -100,6 +104,14 @@ public class NoopTask extends AbstractTask
     return "noop";
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return ImmutableSet.of();
+  }
+
   @JsonProperty
   public long getRunTime()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 9a9d49670d..1cc2d329c8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -36,10 +37,15 @@ import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmen
 import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
 import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
 import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Represents a task that can run on a worker. The general contracts surrounding Tasks are:
@@ -138,6 +144,33 @@ public interface Task
    */
   String getDataSource();
 
+  /**
+   * @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if
+   * the task does not use any. Users can be given permission to access particular types of
+   * input sources but not others, using the
+   * {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
+   * @throws UnsupportedOperationException if the given task type does not suppoert input source based security. Such
+   * would be the case, if the task uses firehose.
+   */
+  @JsonIgnore
+  @Nonnull
+  default Set<ResourceAction> getInputSourceResources() throws UOE
+  {
+    throw new UOE(StringUtils.format(
+        "Task type [%s], does not support input source based security",
+        getType()
+    ));
+  }
+
+  default UOE getInputSecurityOnFirehoseUnsupportedError()
+  {
+    throw new UOE(StringUtils.format(
+        "Input source based security cannot be performed '%s' task because it uses firehose."
+        + " Change the tasks configuration, or disable `isEnableInputSourceSecurity`",
+        getType()
+    ));
+  }
+
   /**
    * Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
    * should return null.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
index 71dbec7bfe..683a406c42 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
@@ -20,11 +20,20 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
 {
@@ -56,4 +65,20 @@ public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
   {
     return SinglePhaseSubTask.OLD_TYPE_NAME;
   }
+
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 90850eb2bd..852df1de85 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -20,12 +20,14 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.Union;
 import org.apache.datasketches.memory.Memory;
@@ -76,6 +78,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlers;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -270,6 +275,22 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @JsonProperty("spec")
   public ParallelIndexIngestionSpec getIngestionSchema()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 250df1afc3..1cefdb5ae2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -21,11 +21,13 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -43,16 +45,23 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.partition.HashPartitioner;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
 {
@@ -133,6 +142,22 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 3764ec3884..bcb4403e57 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -20,10 +20,12 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnels;
@@ -49,14 +51,21 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * The worker task of {@link PartialDimensionDistributionParallelIndexTaskRunner}. This task
@@ -174,6 +183,22 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 1eb141c3f5..d79ce2b747 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -20,7 +20,9 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -33,6 +35,10 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultInde
 import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.Interval;
@@ -42,6 +48,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -131,6 +138,22 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index ab966e67d5..8402ad6b58 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -20,8 +20,10 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@@ -35,14 +37,20 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
 import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -148,6 +156,22 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 03a49efaf5..8706f8227a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -27,6 +28,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.IngestionState;
@@ -70,6 +72,9 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -77,6 +82,7 @@ import org.apache.druid.timeline.partition.PartitionChunk;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -96,6 +102,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task
@@ -190,6 +197,22 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
     return TYPE;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
+      throw getInputSecurityOnFirehoseUnsupportedError();
+    }
+    return getIngestionSchema().getIOConfig().getInputSource() != null ?
+           getIngestionSchema().getIOConfig().getInputSource().getTypes()
+                               .stream()
+                               .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
+                               .collect(Collectors.toSet()) :
+           ImmutableSet.of();
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index ad3e57cba7..a64b67b3b8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.http;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -61,6 +62,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.metadata.TaskLookup;
@@ -73,6 +75,7 @@ import org.apache.druid.server.http.security.DatasourceResourceFilter;
 import org.apache.druid.server.http.security.StateResourceFilter;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
@@ -106,6 +109,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -131,6 +135,8 @@ public class OverlordResource
   private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
   private final ProvisioningStrategy provisioningStrategy;
 
+  private final AuthConfig authConfig;
+
   private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
   private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
 
@@ -162,7 +168,8 @@ public class OverlordResource
       AuditManager auditManager,
       AuthorizerMapper authorizerMapper,
       WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
-      ProvisioningStrategy provisioningStrategy
+      ProvisioningStrategy provisioningStrategy,
+      AuthConfig authConfig
   )
   {
     this.taskMaster = taskMaster;
@@ -174,6 +181,7 @@ public class OverlordResource
     this.authorizerMapper = authorizerMapper;
     this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
     this.provisioningStrategy = provisioningStrategy;
+    this.authConfig = authConfig;
   }
 
   /**
@@ -187,15 +195,24 @@ public class OverlordResource
   @Produces(MediaType.APPLICATION_JSON)
   public Response taskPost(final Task task, @Context final HttpServletRequest req)
   {
-    final String dataSource = task.getDataSource();
-    final ResourceAction resourceAction = new ResourceAction(
-        new Resource(dataSource, ResourceType.DATASOURCE),
-        Action.WRITE
-    );
+    final Set<ResourceAction> resourceActions;
+    try {
+      resourceActions = getNeededResourceActionsForTask(task);
+    }
+    catch (UOE e) {
+      return Response.status(Response.Status.BAD_REQUEST)
+          .entity(
+              ImmutableMap.of(
+                  "error",
+                  e.getMessage()
+              )
+          )
+          .build();
+    }
 
-    Access authResult = AuthorizationUtils.authorizeResourceAction(
+    Access authResult = AuthorizationUtils.authorizeAllResourceActions(
         req,
-        resourceAction,
+        resourceActions,
         authorizerMapper
     );
 
@@ -1086,6 +1103,18 @@ public class OverlordResource
     }
   }
 
+  @VisibleForTesting
+  Set<ResourceAction> getNeededResourceActionsForTask(Task task) throws UOE
+  {
+    final String dataSource = task.getDataSource();
+    final Set<ResourceAction> resourceActions = new HashSet<>();
+    resourceActions.add(new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE));
+    if (authConfig.isEnableInputSourceSecurity()) {
+      resourceActions.addAll(task.getInputSourceResources());
+    }
+    return resourceActions;
+  }
+
   private List<TaskStatusPlus> securedTaskStatusPlus(
       List<TaskStatusPlus> collectionToFilter,
       @Nullable String dataSource,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 8daedf81c9..5fb5722dca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
@@ -1280,6 +1281,20 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
     Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
   }
 
+  @Test(timeout = 60_000L)
+  public void testInputSourceResourcesThrowException()
+  {
+    // Expect 2 segments as we will hit maxTotalRows
+    expectPublishedSegments(2);
+
+    final AppenderatorDriverRealtimeIndexTask task =
+        makeRealtimeTask(null, Integer.MAX_VALUE, 1500L);
+    Assert.assertThrows(
+        UOE.class,
+        task::getInputSourceResources
+    );
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index f91e67dc7c..3a713f74e4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -62,6 +62,10 @@ import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.NoopSegmentCacheManager;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@@ -87,6 +91,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -922,6 +927,15 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
         null
     );
 
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                LocalInputSource.TYPE_KEY
+            ), Action.READ)),
+        indexTask.getInputSourceResources()
+    );
+
     runTask(indexTask);
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 19b178caa3..9815dde8fe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -673,6 +673,25 @@ public class CompactionTaskTest
     assertEquals(expectedFromJson, fromJson);
   }
 
+  @Test
+  public void testInputSourceResources()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask task = builder
+        .inputSpec(
+            new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
+        )
+        .tuningConfig(createTuningConfig())
+        .context(ImmutableMap.of("testKey", "testContext"))
+        .build();
+
+    Assert.assertTrue(task.getInputSourceResources().isEmpty());
+  }
+
   @Test
   public void testGetTuningConfigWithIndexTuningConfig()
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java
new file mode 100644
index 0000000000..a99ad7b6bc
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.HadoopIOConfig;
+import org.apache.druid.indexer.HadoopIngestionSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class HadoopIndexTaskTest
+{
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+  @Test
+  public void testCorrectInputSourceTypes()
+  {
+    final HadoopIndexTask task = new HadoopIndexTask(
+        null,
+        new HadoopIngestionSpec(
+            new DataSchema(
+                "foo", null, new AggregatorFactory[0], new UniformGranularitySpec(
+                Granularities.DAY,
+                null,
+                ImmutableList.of(Intervals.of("2010-01-01/P1D"))
+            ),
+                null,
+                jsonMapper
+            ), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null
+        ),
+        null,
+        null,
+        "blah",
+        jsonMapper,
+        null,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null
+    );
+
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                "hadoop"
+            ), Action.READ)),
+        task.getInputSourceResources()
+    );
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 5b6dcbfcc4..c12e9c4308 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFacto
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -209,6 +213,54 @@ public class IndexTaskTest extends IngestionTestBase
     taskRunner = new TestTaskRunner();
   }
 
+  @Test
+  public void testCorrectInputSourceTypes() throws IOException
+  {
+    File tmpDir = temporaryFolder.newFolder();
+    IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        new IndexIngestionSpec(
+            new DataSchema(
+                "test-json",
+                DEFAULT_TIMESTAMP_SPEC,
+                new DimensionsSpec(
+                    ImmutableList.of(
+                        new StringDimensionSchema("ts"),
+                        new StringDimensionSchema("dim"),
+                        new LongDimensionSchema("valDim")
+                    )
+                ),
+                new AggregatorFactory[]{new LongSumAggregatorFactory("valMet", "val")},
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.MINUTE,
+                    Collections.singletonList(Intervals.of("2014/P1D"))
+                ),
+                null
+            ),
+            new IndexIOConfig(
+                null,
+                new LocalInputSource(tmpDir, "druid*"),
+                DEFAULT_INPUT_FORMAT,
+                false,
+                false
+            ),
+            createTuningConfigWithMaxRowsPerSegment(10, true)
+        ),
+        null
+    );
+
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                LocalInputSource.TYPE_KEY
+            ), Action.READ)),
+        indexTask.getInputSourceResources()
+    );
+  }
+
   @Test
   public void testIngestNullOnlyColumns() throws Exception
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java
new file mode 100644
index 0000000000..ad7a61fa20
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NoopTaskTest
+{
+  @Test
+  public void testNullInputSources()
+  {
+    NoopTask task = new NoopTask("myID", null, null, 1, 0, null, null, null);
+    Assert.assertTrue(task.getInputSourceResources().isEmpty());
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index c3c1f4b780..3e5d1df89c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -68,6 +68,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
@@ -199,6 +200,16 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
     Assert.assertTrue(task.supportsQueries());
   }
 
+  @Test(timeout = 60_000L)
+  public void testInputSourceTypes()
+  {
+    final RealtimeIndexTask task = makeRealtimeTask(null);
+    Assert.assertThrows(
+        UOE.class,
+        task::getInputSourceResources
+    );
+  }
+
   @Test(timeout = 60_000L, expected = ExecutionException.class)
   public void testHandoffTimeout() throws Exception
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
new file mode 100644
index 0000000000..ca8458cf7f
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TaskTest
+{
+  private static final Task TASK = new Task()
+  {
+    @Override
+    public String getId()
+    {
+      return null;
+    }
+
+    @Override
+    public String getGroupId()
+    {
+      return null;
+    }
+
+    @Override
+    public TaskResource getTaskResource()
+    {
+      return null;
+    }
+
+    @Override
+    public String getType()
+    {
+      return null;
+    }
+
+    @Override
+    public String getNodeType()
+    {
+      return null;
+    }
+
+    @Override
+    public String getDataSource()
+    {
+      return null;
+    }
+
+    @Override
+    public <T> QueryRunner<T> getQueryRunner(Query<T> query)
+    {
+      return null;
+    }
+
+    @Override
+    public boolean supportsQueries()
+    {
+      return false;
+    }
+
+    @Override
+    public String getClasspathPrefix()
+    {
+      return null;
+    }
+
+    @Override
+    public boolean isReady(TaskActionClient taskActionClient)
+    {
+      return false;
+    }
+
+    @Override
+    public boolean canRestore()
+    {
+      return false;
+    }
+
+    @Override
+    public void stopGracefully(TaskConfig taskConfig)
+    {
+
+    }
+
+    @Override
+    public TaskStatus run(TaskToolbox toolbox)
+    {
+      return null;
+    }
+
+    @Override
+    public Map<String, Object> getContext()
+    {
+      return null;
+    }
+  };
+
+  @Test
+  public void testGetInputSourceTypes()
+  {
+    Assert.assertThrows(
+        UOE.class,
+        TASK::getInputSourceResources
+    );
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index 1e529cbfe6..d8e64b0539 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -44,6 +44,10 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.testing.junit.LoggerCaptureRule;
 import org.apache.logging.log4j.core.LogEvent;
 import org.easymock.Capture;
@@ -114,6 +118,21 @@ public class PartialDimensionCardinalityTaskTest
       TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
     }
 
+    @Test
+    public void hasCorrectInputSourceTypes()
+    {
+      PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
+          .build();
+      Assert.assertEquals(
+          Collections.singleton(
+              new ResourceAction(new Resource(
+                  ResourceType.EXTERNAL,
+                  InlineInputSource.TYPE_KEY
+              ), Action.READ)),
+          task.getInputSourceResources()
+      );
+    }
+
     @Test
     public void hasCorrectPrefixForAutomaticId()
     {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 49fb404138..2a9109e680 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -37,9 +37,12 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.testing.junit.LoggerCaptureRule;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.apache.logging.log4j.core.LogEvent;
@@ -108,14 +111,6 @@ public class PartialDimensionDistributionTaskTest
           .build();
     }
 
-    @Test
-    public void serializesDeserializes()
-    {
-      PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder()
-          .build();
-      TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
-    }
-
     @Test
     public void hasCorrectPrefixForAutomaticId()
     {
@@ -375,6 +370,22 @@ public class PartialDimensionDistributionTaskTest
       Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
     }
 
+    @Test
+    public void testInputSourceResources()
+    {
+      PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder()
+          .build();
+
+      Assert.assertEquals(
+          Collections.singleton(
+              new ResourceAction(
+                  new Resource(ResourceType.EXTERNAL, InlineInputSource.TYPE_KEY),
+                  Action.READ
+              )),
+          task.getInputSourceResources()
+      );
+    }
+
     private DimensionDistributionReport runTask(PartialDimensionDistributionTaskBuilder taskBuilder)
     {
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
index 749921fff3..37ff1a2150 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
@@ -30,6 +30,10 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.hamcrest.Matchers;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -39,6 +43,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -86,6 +91,19 @@ public class PartialHashSegmentGenerateTaskTest
     Assert.assertThat(id, Matchers.startsWith(PartialHashSegmentGenerateTask.TYPE));
   }
 
+  @Test
+  public void hasCorrectInputSourceTypes()
+  {
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                LocalInputSource.TYPE_KEY
+            ), Action.READ)),
+        target.getInputSourceResources()
+    );
+  }
+
   @Test
   public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsReturningAnalysisOfValidNumBuckets()
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
index 09016e192e..f2a235ef71 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
@@ -30,6 +30,10 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -101,6 +105,20 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu
     TestHelper.testSerializesDeserializes(getObjectMapper(), task);
   }
 
+  @Test
+  public void hasCorrectInputSourceTypes()
+  {
+    PartialRangeSegmentGenerateTask task = new PartialRangeSegmentGenerateTaskBuilder().build();
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                InlineInputSource.TYPE_KEY
+            ), Action.READ)),
+        task.getInputSourceResources()
+    );
+  }
+
   @Test
   public void hasCorrectPrefixForAutomaticId()
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 0b5fbf2dce..e57438b8cb 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -46,6 +46,10 @@ import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentTimeline;
@@ -172,6 +176,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
       final TaskActionClient subTaskActionClient = createActionClient(subTask);
       prepareTaskForLocking(subTask);
       Assert.assertTrue(subTask.isReady(subTaskActionClient));
+      Assert.assertEquals(
+          Collections.singleton(
+              new ResourceAction(new Resource(
+                  ResourceType.EXTERNAL,
+                  LocalInputSource.TYPE_KEY
+              ), Action.READ)),
+          subTask.getInputSourceResources()
+      );
     }
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
index d42f7e6920..ab260b9cf2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
@@ -28,12 +28,17 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 public class SinglePhaseSubTaskSpecTest
@@ -88,5 +93,13 @@ public class SinglePhaseSubTaskSpecTest
     final byte[] json = mapper.writeValueAsBytes(expected);
     final Map<String, Object> actual = mapper.readValue(json, Map.class);
     Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
+    Assert.assertEquals(
+        Collections.singleton(
+            new ResourceAction(new Resource(
+                ResourceType.EXTERNAL,
+                LocalInputSource.TYPE_KEY
+            ), Action.READ)),
+        expected.getInputSourceResources()
+    );
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index f01668fd37..caf8b792df 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -52,6 +52,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
 import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -64,6 +65,8 @@ import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
@@ -87,6 +90,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class OverlordResourceTest
@@ -95,6 +99,7 @@ public class OverlordResourceTest
   private TaskMaster taskMaster;
   private JacksonConfigManager configManager;
   private ProvisioningStrategy provisioningStrategy;
+  private AuthConfig authConfig;
   private TaskStorageQueryAdapter taskStorageQueryAdapter;
   private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
   private HttpServletRequest req;
@@ -110,6 +115,7 @@ public class OverlordResourceTest
     taskRunner = EasyMock.createMock(TaskRunner.class);
     configManager = EasyMock.createMock(JacksonConfigManager.class);
     provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
+    authConfig = EasyMock.createMock(AuthConfig.class);
     taskMaster = EasyMock.createStrictMock(TaskMaster.class);
     taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
     indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -162,7 +168,8 @@ public class OverlordResourceTest
         null,
         authMapper,
         workerTaskRunnerQueryAdapter,
-        provisioningStrategy
+        provisioningStrategy,
+        authConfig
     );
   }
 
@@ -175,7 +182,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
   }
 
@@ -189,7 +197,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.getLeader();
@@ -208,7 +217,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     // true
@@ -253,7 +263,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource.getWaitingTasks(req)
@@ -284,7 +295,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     List<TaskStatusPlus> responseObjects = (List) overlordResource
         .getCompleteTasks(null, req).getEntity();
@@ -325,7 +337,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     List<TaskStatusPlus> responseObjects = (List) overlordResource.getRunningTasks(null, req)
@@ -373,7 +386,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
         .getTasks(null, null, null, null, null, req)
@@ -419,7 +433,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@@ -465,7 +480,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
         .getTasks(
@@ -517,7 +533,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     List<TaskStatusPlus> responseObjects = (List) overlordResource
@@ -566,7 +583,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@@ -600,7 +618,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
         .getTasks("complete", null, null, null, null, req)
@@ -634,7 +653,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     String interval = "2010-01-01_P1D";
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@@ -689,7 +709,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     // Verify that only the tasks of read access datasource are returned
@@ -745,7 +766,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     // Verify that only the tasks of read access datasource are returned
@@ -772,7 +794,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     // Verify that only the tasks of read access datasource are returned
@@ -805,7 +828,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
         .getTasks("complete", null, null, null, null, req)
@@ -824,7 +848,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     Object responseObject = overlordResource
         .getTasks("blah", "ds_test", null, null, null, req)
@@ -840,6 +865,7 @@ public class OverlordResourceTest
   {
     expectedException.expect(ForbiddenException.class);
     expectAuthorizationTokenCheck();
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
 
     EasyMock.replay(
         taskRunner,
@@ -847,7 +873,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
     Task task = NoopTask.create();
     overlordResource.taskPost(task, req);
@@ -857,6 +884,7 @@ public class OverlordResourceTest
   public void testTaskPostDeniesDatasourceReadUser()
   {
     expectAuthorizationTokenCheck(Users.WIKI_READER);
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
 
     EasyMock.replay(
         taskRunner,
@@ -864,7 +892,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     // Verify that taskPost fails for user who has only datasource read access
@@ -895,7 +924,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@@ -924,7 +954,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response1 = overlordResource.getTaskPayload("mytask");
@@ -973,7 +1004,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response1 = overlordResource.getTaskStatus("mytask");
@@ -1031,7 +1063,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
@@ -1057,7 +1090,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     Response response = overlordResource.getDatasourceLockedIntervals(null);
@@ -1091,7 +1125,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         mockQueue,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@@ -1142,7 +1177,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         mockQueue,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@@ -1164,7 +1200,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
@@ -1185,7 +1222,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.enableWorker(host);
@@ -1208,7 +1246,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.disableWorker(host);
@@ -1231,7 +1270,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.enableWorker(host);
@@ -1254,7 +1294,8 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         req,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        authConfig
     );
 
     final Response response = overlordResource.disableWorker(host);
@@ -1277,7 +1318,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         workerTaskRunnerQueryAdapter,
-        configManager
+        configManager,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus());
@@ -1296,7 +1338,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         workerTaskRunnerQueryAdapter,
-        configManager
+        configManager,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@@ -1316,7 +1359,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         workerTaskRunnerQueryAdapter,
-        configManager
+        configManager,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@@ -1337,7 +1381,8 @@ public class OverlordResourceTest
         indexerMetadataStorageAdapter,
         req,
         workerTaskRunnerQueryAdapter,
-        configManager
+        configManager,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@@ -1384,7 +1429,8 @@ public class OverlordResourceTest
         req,
         workerTaskRunnerQueryAdapter,
         configManager,
-        provisioningStrategy
+        provisioningStrategy,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@@ -1431,7 +1477,8 @@ public class OverlordResourceTest
         req,
         workerTaskRunnerQueryAdapter,
         configManager,
-        provisioningStrategy
+        provisioningStrategy,
+        authConfig
     );
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@@ -1439,6 +1486,108 @@ public class OverlordResourceTest
     Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
   }
 
+  @Test
+  public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled()
+  {
+
+    final String dataSource = "dataSourceTest";
+    final String inputSourceType = "local";
+    Task task = EasyMock.createMock(Task.class);
+
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    EasyMock.expect(task.getDataSource()).andReturn(dataSource);
+    EasyMock.expect(task.getInputSourceResources())
+            .andReturn(ImmutableSet.of(new ResourceAction(
+                new Resource(ResourceType.EXTERNAL, inputSourceType),
+                Action.READ
+            )));
+
+    EasyMock.replay(
+        task,
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
+        new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE),
+        new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ)
+    );
+    Set<ResourceAction> resourceActions = overlordResource.getNeededResourceActionsForTask(task);
+    Assert.assertEquals(expectedResourceActions, resourceActions);
+  }
+
+  @Test
+  public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled()
+  {
+
+    final String dataSource = "dataSourceTest";
+    final UOE expectedException = new UOE("unsupported");
+    Task task = EasyMock.createMock(Task.class);
+
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    EasyMock.expect(task.getId()).andReturn("taskId");
+    EasyMock.expect(task.getDataSource()).andReturn(dataSource);
+    EasyMock.expect(task.getInputSourceResources()).andThrow(expectedException);
+
+    EasyMock.replay(
+        task,
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+
+    final UOE e = Assert.assertThrows(
+        UOE.class,
+        () -> overlordResource.getNeededResourceActionsForTask(task)
+    );
+
+    Assert.assertEquals(expectedException, e);
+  }
+
+  @Test
+  public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled()
+  {
+
+    final String dataSource = "dataSourceTest";
+    final String inputSourceType = "local";
+    Task task = EasyMock.createMock(Task.class);
+
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
+    EasyMock.expect(task.getDataSource()).andReturn(dataSource);
+    EasyMock.expect(task.getInputSourceResources())
+            .andReturn(ImmutableSet.of(new ResourceAction(
+                new Resource(ResourceType.EXTERNAL, inputSourceType),
+                Action.READ
+            )));
+
+    EasyMock.replay(
+        task,
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
+        new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE)
+    );
+    Set<ResourceAction> resourceActions = overlordResource.getNeededResourceActionsForTask(task);
+    Assert.assertEquals(expectedResourceActions, resourceActions);
+  }
+
   private void expectAuthorizationTokenCheck()
   {
     expectAuthorizationTokenCheck(Users.DRUID);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 3ada645ff8..c33917de57 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -258,7 +258,8 @@ public class OverlordTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         workerTaskRunnerQueryAdapter,
-        null
+        null,
+        new AuthConfig()
     );
     Response response = overlordResource.getLeader();
     Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
diff --git a/processing/src/main/java/org/apache/druid/data/input/InputSource.java b/processing/src/main/java/org/apache/druid/data/input/InputSource.java
index 793cad2fed..be815742be 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputSource.java
@@ -28,9 +28,12 @@ import org.apache.druid.data.input.impl.HttpInputSource;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.UOE;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Set;
 
 /**
  * InputSource abstracts the storage system where input data is stored. It creates an {@link InputSourceReader}
@@ -87,4 +90,16 @@ public interface InputSource
       @Nullable InputFormat inputFormat,
       File temporaryDirectory
   );
+
+  /**
+   * The types of input sources uses. A set is returned here, as some InputSource implementation allow for
+   * combining of multiple input sources.
+   * @return The types of input sources uses
+   */
+  @JsonIgnore
+  @Nonnull
+  default Set<String> getTypes()
+  {
+    throw new UOE("This inputSource does not support input source based security");
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java
index 05899021a6..9e3f84fe1c 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.AbstractInputSource;
@@ -29,10 +30,13 @@ import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.java.util.common.Pair;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Stream;
 
 /**
@@ -61,6 +65,19 @@ public class CombiningInputSource extends AbstractInputSource implements Splitta
     this.delegates = delegates;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    Set<String> types = new HashSet<>();
+    for (InputSource delegate : delegates) {
+      types.addAll(delegate.getTypes());
+    }
+
+    return types;
+  }
+
   @JsonProperty
   public List<SplittableInputSource> getDelegates()
   {
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
index 616f05afa5..61b4a8e675 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
@@ -34,12 +35,14 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.PasswordProvider;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Stream;
 
 public class HttpInputSource extends AbstractInputSource implements SplittableInputSource<URI>
@@ -69,6 +72,14 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
     this.config = config;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
   {
     for (URI uri : uris) {
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
index eafe13aaec..319750ef3e 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
@@ -30,9 +30,12 @@ import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.java.util.common.StringUtils;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Stream;
 
 public class InlineInputSource extends AbstractInputSource
@@ -48,6 +51,14 @@ public class InlineInputSource extends AbstractInputSource
     this.data = data;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   @JsonProperty
   public String getData()
   {
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
index 4a3dd9e257..23208c94d7 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
@@ -45,12 +46,14 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.utils.CollectionUtils;
 import org.apache.druid.utils.Streams;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -81,6 +84,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
     }
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   public LocalInputSource(File baseDir, String filter)
   {
     this(baseDir, filter, null);
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java
index 1db194baec..32c8117a9a 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -42,8 +43,11 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -66,6 +70,24 @@ public class CombiningInputSourceTest
     Assert.assertEquals(combiningInputSource, fromJson);
   }
 
+  @Test
+  public void testGetTypes()
+  {
+    final ObjectMapper mapper = new ObjectMapper();
+    mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class));
+    final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile()));
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(URI.create("http://test.com/http-test")));
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        fileSource,
+        uriInputSource
+    ));
+    Set<String> expectedTypes = new HashSet<>();
+    expectedTypes.addAll(fileSource.getTypes());
+    expectedTypes.addAll(uriInputSource.getTypes());
+    Assert.assertEquals(expectedTypes, combiningInputSource.getTypes());
+  }
+
   @Test
   public void testEstimateNumSplits()
   {
@@ -201,6 +223,13 @@ public class CombiningInputSourceTest
       files = fileList;
     }
 
+    @JsonIgnore
+    @Override
+    public Set<String> getTypes()
+    {
+      return Collections.singleton("testFile");
+    }
+
     @JsonProperty
     public List<File> getFiles()
     {
@@ -261,6 +290,13 @@ public class CombiningInputSourceTest
       uris = uriList;
     }
 
+    @JsonIgnore
+    @Override
+    public Set<String> getTypes()
+    {
+      return Collections.singleton("testUri");
+    }
+
     @JsonProperty
     public List<URI> getUris()
     {
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java
new file mode 100644
index 0000000000..0d354d3b99
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class InlineInputSourceTest
+{
+  @Test
+  public void testGetTypes()
+  {
+    InlineInputSource inputSource = new InlineInputSource("data");
+    Assert.assertEquals(Collections.singleton(InlineInputSource.TYPE_KEY), inputSource.getTypes());
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java
new file mode 100644
index 0000000000..34c7850bf2
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.java.util.common.UOE;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+public class InputSourceTest
+{
+  private static InputSource INPUT_SOURCE = new InputSource()
+  {
+    @Override
+    public boolean isSplittable()
+    {
+      return false;
+    }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    public InputSourceReader reader(
+        InputRowSchema inputRowSchema,
+        @Nullable InputFormat inputFormat,
+        File temporaryDirectory
+    )
+    {
+      return null;
+    }
+  };
+
+  @Test
+  public void testGetTypes()
+  {
+    Assert.assertThrows(UOE.class, () -> INPUT_SOURCE.getTypes());
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
index 2917a2aa5f..c72c52f462 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
@@ -40,6 +40,7 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -88,6 +89,13 @@ public class LocalInputSourceTest
     Assert.assertEquals(source, fromJson);
   }
 
+  @Test
+  public void testGetTypes()
+  {
+    final LocalInputSource source = new LocalInputSource(new File("myFile").getAbsoluteFile(), "myFilter");
+    Assert.assertEquals(Collections.singleton(LocalInputSource.TYPE_KEY), source.getTypes());
+  }
+
   @Test
   public void testEquals()
   {
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index b19aeaa288..1820976e87 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -20,15 +20,20 @@
 package org.apache.druid.indexing.overlord.supervisor;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Used as a tombstone marker in the supervisors metadata table to indicate that the supervisor has been removed.
@@ -111,6 +116,14 @@ public class NoopSupervisorSpec implements SupervisorSpec
     return type;
   }
 
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceTypes()
+  {
+    return ImmutableSet.of();
+  }
+
   @Override
   @JsonProperty("source")
   public String getSource()
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 9b44cd08dd..0386edc15c 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -19,11 +19,17 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.server.security.ResourceAction;
 
+import javax.annotation.Nonnull;
 import java.util.List;
+import java.util.Set;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
@@ -71,6 +77,22 @@ public interface SupervisorSpec
    */
   String getType();
 
+  /**
+   * @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if
+   * the task does not use any. Users can be given permission to access particular types of
+   * input sources but not others, using the
+   * {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
+   */
+  @JsonIgnore
+  @Nonnull
+  default Set<ResourceAction> getInputSourceTypes() throws UnsupportedOperationException
+  {
+    throw new UOE(StringUtils.format(
+        "SuperviserSpec type [%s], does not support input source based security",
+        getType()
+    ));
+  }
+
   /**
    * This API is only used for informational purposes in
    * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable
diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
index eb612f7095..f85adf5965 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
@@ -41,7 +41,7 @@ public class InputSourceModule implements DruidModule
     return ImmutableList.<Module>of(
         new SimpleModule("InputSourceModule")
             .registerSubtypes(
-                new NamedType(SqlInputSource.class, "sql")
+                new NamedType(SqlInputSource.class, SqlInputSource.TYPE_KEY)
             )
     );
   }
diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java
index c7dfbb7fa3..0064a34310 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.metadata.input;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -35,15 +36,18 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Stream;
 
 public class SqlInputSource extends AbstractInputSource implements SplittableInputSource<String>
 {
+  static final String TYPE_KEY = "sql";
   private final List<String> sqls;
   private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector;
   private final ObjectMapper objectMapper;
@@ -68,6 +72,14 @@ public class SqlInputSource extends AbstractInputSource implements SplittableInp
     this.objectMapper = objectMapper;
   }
 
+  @JsonIgnore
+  @Nonnull
+  @Override
+  public Set<String> getTypes()
+  {
+    return Collections.singleton(TYPE_KEY);
+  }
+
   @JsonProperty
   public List<String> getSqls()
   {
diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index fd5fac09e5..c0fbe92df1 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -64,4 +64,11 @@ public class NoopSupervisorSpecTest
     }
     Assert.assertNull(e);
   }
+
+  @Test
+  public void testInputSourceTypes()
+  {
+    NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
+    Assert.assertTrue(noopSupervisorSpec.getInputSourceTypes().isEmpty());
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
new file mode 100644
index 0000000000..4000fabad5
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import org.apache.druid.java.util.common.UOE;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class SupervisorSpecTest
+{
+  private static final SupervisorSpec SUPERVISOR_SPEC = new SupervisorSpec()
+  {
+    @Override
+    public String getId()
+    {
+      return null;
+    }
+
+    @Override
+    public Supervisor createSupervisor()
+    {
+      return null;
+    }
+
+    @Override
+    public List<String> getDataSources()
+    {
+      return null;
+    }
+
+    @Override
+    public String getType()
+    {
+      return null;
+    }
+
+    @Override
+    public String getSource()
+    {
+      return null;
+    }
+  };
+
+  @Test
+  public void test()
+  {
+    Assert.assertThrows(UOE.class, () -> SUPERVISOR_SPEC.getInputSourceTypes());
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
index a83dec676b..7a5ea7b214 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
@@ -57,6 +57,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -123,6 +124,17 @@ public class SqlInputSourceTest
     Assert.assertEquals(sqlInputSource, inputSourceFromJson);
   }
 
+  @Test
+  public void testGetTypes()
+  {
+    mapper.registerSubtypes(TestSerdeFirehoseConnector.class);
+    final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector(
+        new MetadataStorageConnectorConfig());
+    final SqlInputSource sqlInputSource =
+        new SqlInputSource(SqlTestUtils.selectFrom(TABLE_1), true, testSerdeFirehoseConnector, mapper);
+    Assert.assertEquals(Collections.singleton(SqlInputSource.TYPE_KEY), sqlInputSource.getTypes());
+  }
+
   @Test
   public void testSingleSplit() throws Exception
   {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
index 71c532d736..850b472bc8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
@@ -51,14 +51,12 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
   private final Set<ResourceAction> resourceActions;
   private final PlannerContext plannerContext;
   private final SqlValidator validator;
-  private final boolean inputSourceTypeSecurityEnabled;
 
   public SqlResourceCollectorShuttle(SqlValidator validator, PlannerContext plannerContext)
   {
     this.validator = validator;
     this.resourceActions = new HashSet<>();
     this.plannerContext = plannerContext;
-    inputSourceTypeSecurityEnabled = plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity();
   }
 
   @Override
@@ -67,7 +65,7 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
     if (call.getOperator() instanceof AuthorizableOperator) {
       resourceActions.addAll(((AuthorizableOperator) call.getOperator()).computeResources(
           call,
-          inputSourceTypeSecurityEnabled
+          plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity()
       ));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org