You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2023/03/30 02:15:41 UTC

[druid] branch master updated: Allow for Input source security in SQL layer (#13989)

This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb67721f7 Allow for Input source security in SQL layer (#13989)
3bb67721f7 is described below

commit 3bb67721f718d54d80f8b2690f9897d8cdc98ee4
Author: zachjsh <za...@gmail.com>
AuthorDate: Wed Mar 29 22:15:33 2023 -0400

    Allow for Input source security in SQL layer (#13989)
    
    This change introduces the concept of input source type security model, proposed in #13837.. With this change, this feature is only available at the SQL layer, but we will expand to native layer in a follow up PR.
    
    To enable this feature, the user must set the following property to true:
    
    druid.auth.enableInputSourceSecurity=true
    
    The default value for this property is false, which will continue the existing functionality of having the usage all external sources being authorized against the hardcoded resource action
    
    new ResourceAction(new Resource(ResourceType.EXTERNAL, ResourceType.EXTERNAL), Action.READ
    
    When this config is enabled, the users will be required to be authorized for the following resource action
    
    new ResourceAction(new Resource(ResourceType.EXTERNAL, {INPUT_SOURCE_TYPE}, Action.READ
    
    where {INPUT_SOURCE_TYPE} is the type of the input source being used;, http, inline, s3, etc..
    
    Documentation has not been added for the feature as it is not complete at the moment, as we still need to enable this for the native layer in a follow up pr.
---
 .../apache/druid/benchmark/query/SqlBenchmark.java |   4 +-
 .../benchmark/query/SqlExpressionBenchmark.java    |   4 +-
 .../benchmark/query/SqlNestedDataBenchmark.java    |   4 +-
 .../benchmark/query/SqlVsNativeBenchmark.java      |   4 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |   4 +-
 .../catalog/model/table/S3InputSourceDefn.java     |   3 +-
 .../catalog/model/table/S3InputSourceDefnTest.java |  12 ++
 .../catalog/model/table/BaseInputSourceDefn.java   |   6 +-
 .../catalog/model/table/ExternalTableSpec.java     |   5 +-
 .../model/table/FormattedInputSourceDefn.java      |   3 +-
 .../apache/druid/server/security/AuthConfig.java   |  31 +++++-
 .../model/table/HttpInputSourceDefnTest.java       |  10 +-
 .../model/table/InlineInputSourceDefnTest.java     |   3 +
 .../model/table/LocalInputSourceDefnTest.java      |   1 +
 .../org/apache/druid/sql/AbstractStatement.java    |   3 +-
 .../main/java/org/apache/druid/sql/SqlToolbox.java |   5 -
 .../calcite/expression/AuthorizableOperator.java   |   2 +-
 .../calcite/external/DruidExternTableMacro.java    |  99 +++++++++++++++++
 .../external/DruidExternTableMacroConversion.java  |  65 +++++++++++
 .../sql/calcite/external/DruidTableMacro.java      |   5 +
 .../external/DruidUserDefinedTableMacro.java       |   2 +-
 .../external/ExternalOperatorConversion.java       |   9 +-
 .../sql/calcite/external/ExternalTableMacro.java   |  86 ---------------
 .../druid/sql/calcite/external/Externals.java      |  16 ++-
 .../external/SchemaAwareUserDefinedTableMacro.java |  17 ++-
 .../druid/sql/calcite/planner/PlannerFactory.java  |   7 +-
 .../druid/sql/calcite/planner/PlannerToolbox.java  |  11 +-
 .../planner/SqlResourceCollectorShuttle.java       |   7 +-
 .../druid/sql/calcite/table/ExternalTable.java     |  12 +-
 .../java/org/apache/druid/sql/guice/SqlModule.java |   3 -
 .../org/apache/druid/sql/SqlStatementTest.java     |   4 +-
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |   4 +-
 .../druid/sql/avatica/DruidStatementTest.java      |   4 +-
 .../druid/sql/calcite/CalciteIngestionDmlTest.java |  10 ++
 .../druid/sql/calcite/CalciteInsertDmlTest.java    | 122 +++++++++++++++++++++
 .../calcite/SqlVectorizedExpressionSanityTest.java |   4 +-
 .../calcite/expression/ExpressionTestHelper.java   |   4 +-
 .../external/ExternalTableScanRuleTest.java        |   4 +-
 .../calcite/planner/CalcitePlannerModuleTest.java  |   4 +-
 .../sql/calcite/planner/DruidRexExecutorTest.java  |   4 +-
 .../druid/sql/calcite/util/CalciteTestBase.java    |   5 +
 .../sql/calcite/util/QueryFrameworkUtils.java      |   1 -
 .../druid/sql/calcite/util/SqlTestFramework.java   |   3 +-
 .../org/apache/druid/sql/http/SqlResourceTest.java |   4 +-
 44 files changed, 480 insertions(+), 140 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 2b2bc61af4..da5d2369dd 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -43,6 +43,7 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -487,7 +488,8 @@ public class SqlBenchmark
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
   }
 
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 2bfa5773a9..1c64d7a749 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.calcite.SqlVectorizedExpressionSanityTest;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
@@ -324,7 +325,8 @@ public class SqlExpressionBenchmark
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
 
     try {
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index d509bd2ede..42f7e57418 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -44,6 +44,7 @@ import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.calcite.SqlVectorizedExpressionSanityTest;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
@@ -313,7 +314,8 @@ public class SqlNestedDataBenchmark
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
 
     try {
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index 3d5b2aa381..1cb747048d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
@@ -130,7 +131,8 @@ public class SqlVsNativeBenchmark
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
     groupByQuery = GroupByQuery
         .builder()
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 705e2c9597..a684315b83 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -134,6 +134,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.DirectStatement;
 import org.apache.druid.sql.SqlQueryPlus;
@@ -510,7 +511,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        catalogResolver
+        catalogResolver,
+        new AuthConfig()
     );
 
     sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine, plannerFactory);
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java
index 73150775e2..358862d033 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/catalog/model/table/S3InputSourceDefn.java
@@ -71,6 +71,7 @@ import java.util.Map;
  */
 public class S3InputSourceDefn extends FormattedInputSourceDefn
 {
+  public static final String TYPE_KEY = S3StorageDruidModule.SCHEME;
   public static final String URIS_PARAMETER = "uris";
   public static final String PREFIXES_PARAMETER = "prefixes";
   public static final String BUCKET_PARAMETER = "bucket";
@@ -117,7 +118,7 @@ public class S3InputSourceDefn extends FormattedInputSourceDefn
   @Override
   public String typeValue()
   {
-    return S3StorageDruidModule.SCHEME;
+    return TYPE_KEY;
   }
 
   @Override
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
index ec55828ca8..759cbc6875 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
@@ -345,6 +345,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(uris),
         s3InputSource.getUris()
     );
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
@@ -365,6 +366,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(uris),
         s3InputSource.getUris()
     );
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -397,6 +399,7 @@ public class S3InputSourceDefnTest
         s3InputSource.getUris()
     );
     assertEquals("*.csv", s3InputSource.getObjectGlob());
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -416,6 +419,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(prefixes),
         s3InputSource.getPrefixes()
     );
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
@@ -438,6 +442,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(prefixes),
         s3InputSource.getPrefixes()
     );
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -457,6 +462,7 @@ public class S3InputSourceDefnTest
     CloudObjectLocation obj = s3InputSource.getObjects().get(0);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
@@ -504,6 +510,7 @@ public class S3InputSourceDefnTest
     obj = s3InputSource.getObjects().get(1);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("mumble/file2.csv", obj.getPath());
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -571,6 +578,7 @@ public class S3InputSourceDefnTest
     ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
     ExternalTableSpec externSpec = externDefn.convert(resolved);
     assertEquals(s3InputSource, externSpec.inputSource);
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
@@ -580,6 +588,7 @@ public class S3InputSourceDefnTest
     externSpec = fn.apply("x", Collections.emptyMap(), Collections.emptyList(), mapper);
     assertEquals(s3InputSource, externSpec.inputSource);
 
+
     // But, it fails columns are provided since the table already has them.
     assertThrows(IAE.class, () -> fn.apply("x", Collections.emptyMap(), COLUMNS, mapper));
   }
@@ -604,6 +613,7 @@ public class S3InputSourceDefnTest
     ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
     ExternalTableSpec externSpec = externDefn.convert(resolved);
     assertEquals(s3InputSource, externSpec.inputSource);
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
@@ -662,6 +672,7 @@ public class S3InputSourceDefnTest
     CloudObjectLocation obj = s3InputSource.getObjects().get(0);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // But, it fails columns are provided since the table already has them.
     assertThrows(IAE.class, () -> fn.apply("x", args, COLUMNS, mapper));
@@ -703,6 +714,7 @@ public class S3InputSourceDefnTest
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
     assertTrue(externSpec.inputFormat instanceof CsvInputFormat);
+    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
 
     // But, it fails columns are not provided since the table does not have them.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 9ec432b71c..83a085ead4 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -151,7 +151,8 @@ public abstract class BaseInputSourceDefn implements InputSourceDefn
     return new ExternalTableSpec(
         convertArgsToSource(args, jsonMapper),
         convertArgsToFormat(args, columns, jsonMapper),
-        Columns.convertSignature(columns)
+        Columns.convertSignature(columns),
+        typeValue()
     );
   }
 
@@ -207,7 +208,8 @@ public abstract class BaseInputSourceDefn implements InputSourceDefn
     return new ExternalTableSpec(
         convertTableToSource(table),
         convertTableToFormat(table),
-        Columns.convertSignature(table.resolvedTable().spec().columns())
+        Columns.convertSignature(table.resolvedTable().spec().columns()),
+        typeValue()
     );
   }
 
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
index deb814328d..a132ff6728 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
@@ -36,14 +36,17 @@ public class ExternalTableSpec
   public final InputSource inputSource;
   public final InputFormat inputFormat;
   @Nullable public final RowSignature signature;
+  public final String inputSourceType;
 
   public ExternalTableSpec(
       final InputSource inputSource,
       final InputFormat inputFormat,
-      final RowSignature signature)
+      final RowSignature signature,
+      final String inputSourceType)
   {
     this.inputSource = inputSource;
     this.inputFormat = inputFormat;
     this.signature = signature;
+    this.inputSourceType = inputSourceType;
   }
 }
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index 7cd16ee7ca..bf0a904bca 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -193,7 +193,8 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
     return new ExternalTableSpec(
         convertSource(sourceMap, jsonMapper),
         inputFormat,
-        Columns.convertSignature(completedCols)
+        Columns.convertSignature(completedCols),
+        typeValue()
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
index 8bbdac7003..8413155a8f 100644
--- a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
+++ b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
@@ -63,7 +63,7 @@ public class AuthConfig
 
   public AuthConfig()
   {
-    this(null, null, null, false, false, null, null);
+    this(null, null, null, false, false, null, null, false);
   }
 
   @JsonProperty
@@ -97,6 +97,9 @@ public class AuthConfig
   @JsonProperty
   private final Set<String> securedContextKeys;
 
+  @JsonProperty
+  private final boolean enableInputSourceSecurity;
+
   @JsonCreator
   public AuthConfig(
       @JsonProperty("authenticatorChain") List<String> authenticatorChain,
@@ -105,7 +108,8 @@ public class AuthConfig
       @JsonProperty("allowUnauthenticatedHttpOptions") boolean allowUnauthenticatedHttpOptions,
       @JsonProperty("authorizeQueryContextParams") boolean authorizeQueryContextParams,
       @JsonProperty("unsecuredContextKeys") Set<String> unsecuredContextKeys,
-      @JsonProperty("securedContextKeys") Set<String> securedContextKeys
+      @JsonProperty("securedContextKeys") Set<String> securedContextKeys,
+      @JsonProperty("enableInputSourceSecurity") boolean enableInputSourceSecurity
   )
   {
     this.authenticatorChain = authenticatorChain;
@@ -117,6 +121,7 @@ public class AuthConfig
         ? Collections.emptySet()
         : unsecuredContextKeys;
     this.securedContextKeys = securedContextKeys;
+    this.enableInputSourceSecurity = enableInputSourceSecurity;
   }
 
   public List<String> getAuthenticatorChain()
@@ -144,6 +149,11 @@ public class AuthConfig
     return authorizeQueryContextParams;
   }
 
+  public boolean isEnableInputSourceSecurity()
+  {
+    return enableInputSourceSecurity;
+  }
+
   /**
    * Filter the user-supplied context keys based on the context key security
    * rules. If context key security is disabled, then allow all keys. Else,
@@ -190,7 +200,8 @@ public class AuthConfig
            && Objects.equals(authorizers, that.authorizers)
            && Objects.equals(unsecuredPaths, that.unsecuredPaths)
            && Objects.equals(unsecuredContextKeys, that.unsecuredContextKeys)
-           && Objects.equals(securedContextKeys, that.securedContextKeys);
+           && Objects.equals(securedContextKeys, that.securedContextKeys)
+           && Objects.equals(enableInputSourceSecurity, that.enableInputSourceSecurity);
   }
 
   @Override
@@ -203,7 +214,8 @@ public class AuthConfig
         allowUnauthenticatedHttpOptions,
         authorizeQueryContextParams,
         unsecuredContextKeys,
-        securedContextKeys
+        securedContextKeys,
+        enableInputSourceSecurity
     );
   }
 
@@ -218,6 +230,7 @@ public class AuthConfig
            ", enableQueryContextAuthorization=" + authorizeQueryContextParams +
            ", unsecuredContextKeys=" + unsecuredContextKeys +
            ", securedContextKeys=" + securedContextKeys +
+           ", enableInputSourceSecurity=" + enableInputSourceSecurity +
            '}';
   }
 
@@ -238,6 +251,7 @@ public class AuthConfig
     private boolean authorizeQueryContextParams;
     private Set<String> unsecuredContextKeys;
     private Set<String> securedContextKeys;
+    private boolean enableInputSourceSecurity;
 
     public Builder setAuthenticatorChain(List<String> authenticatorChain)
     {
@@ -281,6 +295,12 @@ public class AuthConfig
       return this;
     }
 
+    public Builder setEnableInputSourceSecurity(boolean enableInputSourceSecurity)
+    {
+      this.enableInputSourceSecurity = enableInputSourceSecurity;
+      return this;
+    }
+
     public AuthConfig build()
     {
       return new AuthConfig(
@@ -290,7 +310,8 @@ public class AuthConfig
           allowUnauthenticatedHttpOptions,
           authorizeQueryContextParams,
           unsecuredContextKeys,
-          securedContextKeys
+          securedContextKeys,
+          enableInputSourceSecurity
       );
     }
   }
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 0be1f951bf..21e6f14eac 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -398,6 +398,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
         CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv", "http://foo.com/bar.csv")),
         sourceSpec.getUris()
     );
+    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -441,6 +442,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
         CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/my.csv", "http://foo.com/bar.csv")),
         sourceSpec.getUris()
     );
+    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -464,6 +466,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
         CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv", "http://foo.com/bar.csv")),
         sourceSpec.getUris()
     );
+    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   @Test
@@ -492,7 +495,11 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
 
     HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
     assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
-    assertEquals("SECRET", ((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
+    assertEquals(
+        "SECRET",
+        ((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable()
+    );
+    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   private void validateHappyPath(ExternalTableSpec externSpec, boolean withUser)
@@ -512,6 +519,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
     assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
     assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
     assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
+    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 
   private Map<String, Object> httpToMap(HttpInputSource source)
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
index 499ff8d595..daae0bccd0 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
@@ -144,6 +144,7 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
     CsvInputFormat format = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), format.getColumns());
     assertEquals(2, extern.signature.size());
+    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
 
     // Fails if no columns are provided.
     assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), Collections.emptyList(), mapper));
@@ -178,6 +179,7 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
     CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
     assertEquals(2, extern.signature.size());
+    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
 
     // Cannot supply columns with the function
     List<ColumnSpec> columns = Arrays.asList(
@@ -213,5 +215,6 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
     CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
     assertEquals(2, extern.signature.size());
+    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
   }
 }
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
index 79c79fd44c..a71a00f4b9 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
@@ -536,5 +536,6 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
     assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
     assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
     assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
+    assertEquals(LocalInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
index 05152625d8..99d2fa17a6 100644
--- a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
@@ -152,7 +152,8 @@ public abstract class AbstractStatement implements Closeable
       final Function<Set<ResourceAction>, Access> authorizer
   )
   {
-    Set<String> securedKeys = this.sqlToolbox.authConfig.contextKeysToAuthorize(queryPlus.context().keySet());
+    Set<String> securedKeys = this.sqlToolbox.plannerFactory.getAuthConfig()
+        .contextKeysToAuthorize(queryPlus.context().keySet());
     Set<ResourceAction> contextResources = new HashSet<>();
     securedKeys.forEach(key -> contextResources.add(
         new ResourceAction(new Resource(key, ResourceType.QUERY_CONTEXT), Action.WRITE)
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlToolbox.java b/sql/src/main/java/org/apache/druid/sql/SqlToolbox.java
index abbac66829..6ba8b59e57 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlToolbox.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlToolbox.java
@@ -24,7 +24,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.DefaultQueryConfig;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.log.RequestLogger;
-import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.run.SqlEngine;
 
@@ -38,7 +37,6 @@ public class SqlToolbox
   final ServiceEmitter emitter;
   final RequestLogger requestLogger;
   final QueryScheduler queryScheduler;
-  final AuthConfig authConfig;
   final DefaultQueryConfig defaultQueryConfig;
   final SqlLifecycleManager sqlLifecycleManager;
 
@@ -48,7 +46,6 @@ public class SqlToolbox
       final ServiceEmitter emitter,
       final RequestLogger requestLogger,
       final QueryScheduler queryScheduler,
-      final AuthConfig authConfig,
       final DefaultQueryConfig defaultQueryConfig,
       final SqlLifecycleManager sqlLifecycleManager
   )
@@ -58,7 +55,6 @@ public class SqlToolbox
     this.emitter = emitter;
     this.requestLogger = requestLogger;
     this.queryScheduler = queryScheduler;
-    this.authConfig = authConfig;
     this.defaultQueryConfig = defaultQueryConfig;
     this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager");
   }
@@ -71,7 +67,6 @@ public class SqlToolbox
         emitter,
         requestLogger,
         queryScheduler,
-        authConfig,
         defaultQueryConfig,
         sqlLifecycleManager
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/AuthorizableOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/AuthorizableOperator.java
index 1c6beb31b4..e0d9b09a21 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/AuthorizableOperator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/AuthorizableOperator.java
@@ -31,5 +31,5 @@ import java.util.Set;
  */
 public interface AuthorizableOperator
 {
-  Set<ResourceAction> computeResources(SqlCall call);
+  Set<ResourceAction> computeResources(SqlCall call, boolean inputSourceTypeSecurityEnabled);
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
new file mode 100644
index 0000000000..e8740ba5bb
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.NlsString;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+import javax.validation.constraints.NotNull;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
+ * that references an {@link ExternalDataSource}.
+ */
+public class DruidExternTableMacro extends DruidUserDefinedTableMacro
+{
+  public DruidExternTableMacro(DruidTableMacro macro)
+  {
+    super(macro);
+  }
+
+  @Override
+  public Set<ResourceAction> computeResources(final SqlCall call, boolean inputSourceTypeSecurityEnabled)
+  {
+    if (!inputSourceTypeSecurityEnabled) {
+      return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
+    }
+    String inputSourceStr = getInputSourceArgument(call);
+
+    try {
+      JsonNode jsonNode = ((DruidTableMacro) macro).getJsonMapper().readTree(inputSourceStr);
+      return Collections.singleton(new ResourceAction(new Resource(
+          ResourceType.EXTERNAL,
+          jsonNode.get("type").asText()
+      ), Action.READ));
+    }
+    catch (JsonProcessingException e) {
+      // this shouldn't happen, the input source paraemeter should have been validated before this
+      throw new RuntimeException(e);
+    }
+  }
+
+  @NotNull
+  private String getInputSourceArgument(final SqlCall call)
+  {
+    // this covers case where parameters are used positionally
+    if (call.getOperandList().size() > 0) {
+      if (call.getOperandList().get(0) instanceof SqlCharStringLiteral) {
+        return ((SqlCharStringLiteral) call.getOperandList().get(0)).toValue();
+      }
+    }
+
+    // this covers case where named parameters are used.
+    for (SqlNode sqlNode : call.getOperandList()) {
+      if (sqlNode instanceof SqlCall) {
+        String argumentName = ((SqlCall) sqlNode).getOperandList().size() > 1 ?
+                             ((SqlCall) sqlNode).getOperandList().get(1).toString()
+                             : null;
+        if (ExternalOperatorConversion.INPUT_SOURCE_PARAM.equals(argumentName)) {
+          return ((NlsString) ((SqlCharStringLiteral) ((SqlCall) call.getOperandList().get(0))
+              .getOperandList()
+              .get(0))
+              .getValue())
+              .getValue();
+        }
+      }
+    }
+    // this shouldn't happen, as the sqlCall should have been validated by this point,
+    // and be guarenteed to have this parameter.
+    throw new RuntimeException("inputSource parameter not found in extern function");
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacroConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacroConversion.java
new file mode 100644
index 0000000000..9ac53798cd
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacroConversion.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.external;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.druid.catalog.model.table.TableFunction;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Operator conversion for the `EXTERN` table macros (functions) based on the
+ * {@link TableFunction} abstraction defined by the catalog.
+ */
+public class DruidExternTableMacroConversion implements SqlOperatorConversion
+{
+  private final SqlUserDefinedTableMacro operator;
+
+  public DruidExternTableMacroConversion(
+      final String name,
+      final TableFunction fn,
+      final ObjectMapper jsonMapper
+  )
+  {
+    this.operator = new DruidExternTableMacro(
+        new DruidTableMacro(name, fn, jsonMapper)
+    );
+  }
+
+  @Override
+  public SqlOperator calciteOperator()
+  {
+    return operator;
+  }
+
+  @Nullable
+  @Override
+  public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode)
+  {
+    return null;
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
index 5b4069f1c0..65e00db7c6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
@@ -98,4 +98,9 @@ public class DruidTableMacro implements ExtendedTableMacro
   {
     return parameters;
   }
+
+  public ObjectMapper getJsonMapper()
+  {
+    return jsonMapper;
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
index 09f495fa19..85753f0dc0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
@@ -63,7 +63,7 @@ public class DruidUserDefinedTableMacro extends SchemaAwareUserDefinedTableMacro
   }
 
   @Override
-  public Set<ResourceAction> computeResources(final SqlCall call)
+  public Set<ResourceAction> computeResources(final SqlCall call, final boolean inputSourceTypeSecurityEnabled)
   {
     return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 1d5a5c4fef..6b1b1fb624 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -54,7 +54,7 @@ import java.util.Map;
  * a Druid JSON signature, or an SQL {@code EXTEND} list of columns.
  * As with all table functions, the {@code EXTEND} is optional.
  */
-public class ExternalOperatorConversion extends DruidUserDefinedTableMacroConversion
+public class ExternalOperatorConversion extends DruidExternTableMacroConversion
 {
   public static final String FUNCTION_NAME = "EXTERN";
 
@@ -110,10 +110,13 @@ public class ExternalOperatorConversion extends DruidUserDefinedTableMacroConver
           rowSignature = jsonMapper.readValue(sigValue, RowSignature.class);
         }
 
+        String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
+        String inputSrcType = jsonMapper.readTree(inputSrcStr).get("type").asText();
         return new ExternalTableSpec(
-            jsonMapper.readValue(CatalogUtils.getString(args, INPUT_SOURCE_PARAM), InputSource.class),
+            jsonMapper.readValue(inputSrcStr, InputSource.class),
             jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class),
-            rowSignature
+            rowSignature,
+            inputSrcType
         );
       }
       catch (JsonProcessingException e) {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
deleted file mode 100644
index ac65880c1e..0000000000
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.external;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.schema.FunctionParameter;
-import org.apache.calcite.schema.TableMacro;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.druid.catalog.model.table.ExternalTableSpec;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
-import org.apache.druid.sql.calcite.table.DruidTable;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
- * that references an {@link ExternalDataSource}.
- *
- * This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users.
- */
-public class ExternalTableMacro implements TableMacro
-{
-  private final List<FunctionParameter> parameters = ImmutableList.of(
-      new FunctionParameterImpl(0, "inputSource", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class)),
-      new FunctionParameterImpl(1, "inputFormat", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class)),
-      new FunctionParameterImpl(2, "signature", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class))
-  );
-
-  private final ObjectMapper jsonMapper;
-
-  public ExternalTableMacro(final ObjectMapper jsonMapper)
-  {
-    this.jsonMapper = jsonMapper;
-  }
-
-  public String signature()
-  {
-    final List<String> names = parameters.stream().map(p -> p.getName()).collect(Collectors.toList());
-    return "(" + String.join(", ", names) + ")";
-  }
-
-  @Override
-  public TranslatableTable apply(final List<Object> arguments)
-  {
-    try {
-      ExternalTableSpec spec = new ExternalTableSpec(
-          jsonMapper.readValue((String) arguments.get(0), InputSource.class),
-          jsonMapper.readValue((String) arguments.get(1), InputFormat.class),
-          jsonMapper.readValue((String) arguments.get(2), RowSignature.class)
-      );
-      return Externals.buildExternalTable(spec, jsonMapper);
-    }
-    catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public List<FunctionParameter> getParameters()
-  {
-    return parameters;
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
index db1d38618d..595c825718 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
@@ -279,7 +279,10 @@ public class Externals
    * Create an MSQ ExternalTable given an external table spec. Enforces type restructions
    * (which should be revisited.)
    */
-  public static ExternalTable buildExternalTable(ExternalTableSpec spec, ObjectMapper jsonMapper)
+  public static ExternalTable buildExternalTable(
+      ExternalTableSpec spec,
+      ObjectMapper jsonMapper
+  )
   {
     // Prevent a RowSignature that has a ColumnSignature with name "__time" and type that is not LONG because it
     // will be automatically cast to LONG while processing in RowBasedColumnSelectorFactory.
@@ -294,7 +297,7 @@ public class Externals
                     + "Please change the column name to something other than __time");
     }
 
-    return toExternalTable(spec, jsonMapper);
+    return toExternalTable(spec, jsonMapper, spec.inputSourceType);
   }
 
   public static ResourceAction externalRead(String name)
@@ -302,7 +305,11 @@ public class Externals
     return new ResourceAction(new Resource(name, ResourceType.EXTERNAL), Action.READ);
   }
 
-  public static ExternalTable toExternalTable(ExternalTableSpec spec, ObjectMapper jsonMapper)
+  public static ExternalTable toExternalTable(
+      ExternalTableSpec spec,
+      ObjectMapper jsonMapper,
+      String inputSourceType
+  )
   {
     return new ExternalTable(
         new ExternalDataSource(
@@ -311,7 +318,8 @@ public class Externals
             spec.signature
           ),
         spec.signature,
-        jsonMapper
+        jsonMapper,
+        inputSourceType
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index 855b9ceebf..b05c0a0cd8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -38,9 +38,14 @@ import org.apache.calcite.sql.type.SqlOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlOperandTypeInference;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+import org.apache.druid.sql.calcite.table.ExternalTable;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -163,9 +168,17 @@ public abstract class SchemaAwareUserDefinedTableMacro
     }
 
     @Override
-    public Set<ResourceAction> computeResources(final SqlCall call)
+    public Set<ResourceAction> computeResources(final SqlCall call, final boolean inputSourceTypeSecurityEnabled)
     {
-      return base.computeResources(call);
+      Set<ResourceAction> resourceActions = new HashSet<>();
+      if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
+        resourceActions.add(new ResourceAction(new Resource(
+            ResourceType.EXTERNAL,
+            ((ExternalTable) table).getInputSourceType()
+        ), Action.READ));
+      }
+      resourceActions.addAll(base.computeResources(call, inputSourceTypeSecurityEnabled));
+      return resourceActions;
     }
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
index 085d476072..4395da5fe4 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
@@ -41,6 +41,7 @@ import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.sql.calcite.parser.DruidSqlParserImplFactory;
@@ -75,7 +76,8 @@ public class PlannerFactory extends PlannerToolbox
       final @DruidSchemaName String druidSchemaName,
       final CalciteRulesManager calciteRuleManager,
       final JoinableFactoryWrapper joinableFactoryWrapper,
-      final CatalogResolver catalog
+      final CatalogResolver catalog,
+      final AuthConfig authConfig
   )
   {
     super(
@@ -88,7 +90,8 @@ public class PlannerFactory extends PlannerToolbox
         catalog,
         druidSchemaName,
         calciteRuleManager,
-        authorizerMapper
+        authorizerMapper,
+        authConfig
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerToolbox.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerToolbox.java
index 24655c85b5..f000982611 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerToolbox.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerToolbox.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
 
@@ -38,6 +39,7 @@ public class PlannerToolbox
   protected final String druidSchemaName;
   protected final CalciteRulesManager calciteRuleManager;
   protected final AuthorizerMapper authorizerMapper;
+  protected final AuthConfig authConfig;
 
   public PlannerToolbox(
       final DruidOperatorTable operatorTable,
@@ -49,7 +51,8 @@ public class PlannerToolbox
       final CatalogResolver catalog,
       final String druidSchemaName,
       final CalciteRulesManager calciteRuleManager,
-      final AuthorizerMapper authorizerMapper
+      final AuthorizerMapper authorizerMapper,
+      final AuthConfig authConfig
   )
   {
     this.operatorTable = operatorTable;
@@ -62,6 +65,7 @@ public class PlannerToolbox
     this.druidSchemaName = druidSchemaName;
     this.calciteRuleManager = calciteRuleManager;
     this.authorizerMapper = authorizerMapper;
+    this.authConfig = authConfig;
   }
 
   public DruidOperatorTable operatorTable()
@@ -108,4 +112,9 @@ public class PlannerToolbox
   {
     return plannerConfig;
   }
+
+  public AuthConfig getAuthConfig()
+  {
+    return authConfig;
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
index 464648cc53..71c532d736 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
@@ -51,19 +51,24 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
   private final Set<ResourceAction> resourceActions;
   private final PlannerContext plannerContext;
   private final SqlValidator validator;
+  private final boolean inputSourceTypeSecurityEnabled;
 
   public SqlResourceCollectorShuttle(SqlValidator validator, PlannerContext plannerContext)
   {
     this.validator = validator;
     this.resourceActions = new HashSet<>();
     this.plannerContext = plannerContext;
+    inputSourceTypeSecurityEnabled = plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity();
   }
 
   @Override
   public SqlNode visit(SqlCall call)
   {
     if (call.getOperator() instanceof AuthorizableOperator) {
-      resourceActions.addAll(((AuthorizableOperator) call.getOperator()).computeResources(call));
+      resourceActions.addAll(((AuthorizableOperator) call.getOperator()).computeResources(
+          call,
+          inputSourceTypeSecurityEnabled
+      ));
     }
 
     return super.visit(call);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
index bb98cd93b0..3c6fbdd0ac 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
@@ -42,6 +42,8 @@ public class ExternalTable extends DruidTable
   private final DataSource dataSource;
   private final ObjectMapper objectMapper;
 
+  private final String inputSourceType;
+
   /**
    * Cached row type, to avoid recreating types multiple times.
    */
@@ -50,12 +52,14 @@ public class ExternalTable extends DruidTable
   public ExternalTable(
       final DataSource dataSource,
       final RowSignature rowSignature,
-      final ObjectMapper objectMapper
+      final ObjectMapper objectMapper,
+      final String inputSourceType
   )
   {
     super(rowSignature);
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.objectMapper = objectMapper;
+    this.inputSourceType = inputSourceType;
   }
 
   @Override
@@ -89,6 +93,11 @@ public class ExternalTable extends DruidTable
     return rowType;
   }
 
+  public String getInputSourceType()
+  {
+    return inputSourceType;
+  }
+
   @Override
   public RelNode toRel(ToRelContext context, RelOptTable relOptTable)
   {
@@ -101,6 +110,7 @@ public class ExternalTable extends DruidTable
     return "ExternalTable{" +
            "dataSource=" + dataSource +
            ", rowSignature=" + getRowSignature() +
+           ", inputSourceType=" + getInputSourceType() +
            '}';
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
index d0ad849d40..56d0d2d5d4 100644
--- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
@@ -34,7 +34,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.DefaultQueryConfig;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.log.RequestLogger;
-import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlStatementFactory;
 import org.apache.druid.sql.SqlToolbox;
@@ -165,7 +164,6 @@ public class SqlModule implements Module
         final ServiceEmitter emitter,
         final RequestLogger requestLogger,
         final QueryScheduler queryScheduler,
-        final AuthConfig authConfig,
         final Supplier<DefaultQueryConfig> defaultQueryConfig,
         final SqlLifecycleManager sqlLifecycleManager
     )
@@ -176,7 +174,6 @@ public class SqlModule implements Module
           emitter,
           requestLogger,
           queryScheduler,
-          authConfig,
           defaultQueryConfig.get(),
           sqlLifecycleManager
       );
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
index aef500ed18..4d96a2ec90 100644
--- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
@@ -158,7 +158,8 @@ public class SqlStatementTest
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         joinableFactoryWrapper,
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
 
     this.sqlStatementFactory = new SqlStatementFactory(
@@ -168,7 +169,6 @@ public class SqlStatementTest
             new NoopServiceEmitter(),
             testRequestLogger,
             QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-            new AuthConfig(),
             defaultQueryConfig,
             new SqlLifecycleManager()
         )
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index c54c94bf39..51f536f41a 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.log.TestRequestLogger;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -1002,7 +1003,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
             CalciteTests.DRUID_SCHEMA_NAME,
             new CalciteRulesManager(ImmutableSet.of()),
             CalciteTests.createJoinableFactoryWrapper(),
-            CatalogResolver.NULL_RESOLVER
+            CatalogResolver.NULL_RESOLVER,
+            new AuthConfig()
         )
     );
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index b039ff07d0..618665c25f 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.security.AllowAllAuthenticator;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
@@ -116,7 +117,8 @@ public class DruidStatementTest extends CalciteTestBase
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         joinableFactoryWrapper,
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
     this.sqlStatementFactory = CalciteTests.createSqlStatementFactory(
         CalciteTests.createMockSqlEngine(walker, conglomerate),
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
index f9c007ff2d..9ce46185a4 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.SqlQueryPlus;
@@ -216,6 +217,7 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
     private Matcher<Throwable> validationErrorMatcher;
     private String expectedLogicalPlanResource;
     private List<SqlParameter> parameters;
+    private AuthConfig authConfig;
 
     private IngestionDmlTester()
     {
@@ -255,6 +257,12 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
       return this;
     }
 
+    public IngestionDmlTester authConfig(AuthConfig authConfig)
+    {
+      this.authConfig = authConfig;
+      return this;
+    }
+
     public IngestionDmlTester expectTarget(
         final String expectedTargetDataSource,
         final RowSignature expectedTargetSignature
@@ -378,6 +386,7 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
           .authResult(authenticationResult)
           .parameters(parameters)
           .plannerConfig(plannerConfig)
+          .authConfig(authConfig)
           .expectedResources(expectedResources)
           .run();
 
@@ -396,6 +405,7 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
           .authResult(authenticationResult)
           .parameters(parameters)
           .plannerConfig(plannerConfig)
+          .authConfig(authConfig)
           .expectedQuery(expectedQuery)
           .expectedResults(Collections.singletonList(new Object[]{expectedTargetDataSource, expectedTargetSignature}))
           .expectedLogicalPlan(expectedLogicalPlan)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 93e519091f..d9b6b6e32f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlPlanningException;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
@@ -307,6 +308,39 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testInsertFromExternalWithInputSourceSecurityEnabled()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), externalRead("inline"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("insertFromExternal")
+        .verify();
+  }
+
+  @Test
+  public void testUnauthorizedInsertFromExternalWithInputSourceSecurityEnabled()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
+        .authentication(CalciteTests.REGULAR_USER_AUTH_RESULT)
+        .authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .expectLogicalPlanFrom("insertFromExternal")
+        .expectValidationError(ForbiddenException.class)
+        .verify();
+  }
+
   @Test
   public void testInsertFromExternalWithSchema()
   {
@@ -351,6 +385,94 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testInsertFromExternalWithSchemaWithInputsourceSecurity()
+  {
+    String extern;
+    ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+    try {
+      extern = StringUtils.format(
+          "TABLE(extern(%s, %s))",
+          Calcites.escapeStringLiteral(
+              queryJsonMapper.writeValueAsString(
+                  new InlineInputSource("a,b,1\nc,d,2\n")
+              )
+          ),
+          Calcites.escapeStringLiteral(
+              queryJsonMapper.writeValueAsString(
+                  new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0)
+              )
+          )
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s\n" +
+             "  (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+             "PARTITIONED BY ALL TIME",
+             extern
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), externalRead("inline"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("insertFromExternal")
+        .verify();
+  }
+
+  @Test
+  public void testInsertFromExternalFunctionalStyleWithSchemaWithInputsourceSecurity()
+  {
+    String extern;
+    ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+    try {
+      extern = StringUtils.format(
+          "TABLE(extern("
+          + "inputSource => '%s',"
+          + "inputFormat => '%s'))",
+          queryJsonMapper.writeValueAsString(
+              new InlineInputSource("a,b,1\nc,d,2\n")
+          ),
+          queryJsonMapper.writeValueAsString(
+              new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0)
+          )
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s\n" +
+             "  (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+             "PARTITIONED BY ALL TIME",
+             extern
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), externalRead("inline"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("insertFromExternal")
+        .verify();
+  }
+
   @Test
   public void testInsertWithPartitionedBy()
   {
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
index 5483fc8141..d757967927 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
@@ -154,7 +155,8 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         joinableFactoryWrapper,
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java
index 4c33a144da..307f7dcb3e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/expression/ExpressionTestHelper.java
@@ -46,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.virtual.VirtualizedColumnSelectorFactory;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
@@ -96,7 +97,8 @@ class ExpressionTestHelper
       CatalogResolver.NULL_RESOLVER,
       "druid",
       new CalciteRulesManager(ImmutableSet.of()),
-      CalciteTests.TEST_AUTHORIZER_MAPPER
+      CalciteTests.TEST_AUTHORIZER_MAPPER,
+      AuthConfig.newBuilder().build()
   );
   private static final PlannerContext PLANNER_CONTEXT = PlannerContext.create(
       PLANNER_TOOLBOX,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java
index d4c0cf1562..64a626f682 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/external/ExternalTableScanRuleTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -69,7 +70,8 @@ public class ExternalTableScanRuleTest
         CatalogResolver.NULL_RESOLVER,
         "druid",
         new CalciteRulesManager(ImmutableSet.of()),
-        CalciteTests.TEST_AUTHORIZER_MAPPER
+        CalciteTests.TEST_AUTHORIZER_MAPPER,
+        AuthConfig.newBuilder().build()
     );
     final PlannerContext plannerContext = PlannerContext.create(
         toolbox,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index 2796adc433..52e52ec7f8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.jackson.JacksonModule;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -181,7 +182,8 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
         CatalogResolver.NULL_RESOLVER,
         "druid",
         new CalciteRulesManager(ImmutableSet.of()),
-        CalciteTests.TEST_AUTHORIZER_MAPPER
+        CalciteTests.TEST_AUTHORIZER_MAPPER,
+        AuthConfig.newBuilder().build()
     );
     PlannerContext context = PlannerContext.create(
         toolbox,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/DruidRexExecutorTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/DruidRexExecutorTest.java
index 1d325c28b9..bb5edf2d94 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/DruidRexExecutorTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/DruidRexExecutorTest.java
@@ -40,6 +40,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
@@ -98,7 +99,8 @@ public class DruidRexExecutorTest extends InitializedNullHandlingTest
       CatalogResolver.NULL_RESOLVER,
       "druid",
       new CalciteRulesManager(ImmutableSet.of()),
-      CalciteTests.TEST_AUTHORIZER_MAPPER
+      CalciteTests.TEST_AUTHORIZER_MAPPER,
+      AuthConfig.newBuilder().build()
   );
   private static final PlannerContext PLANNER_CONTEXT = PlannerContext.create(
       PLANNER_TOOLBOX,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTestBase.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTestBase.java
index e18342c64a..d0674afa34 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTestBase.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTestBase.java
@@ -119,4 +119,9 @@ public abstract class CalciteTestBase
   {
     return new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE);
   }
+
+  protected static ResourceAction externalRead(final String inputSourceType)
+  {
+    return new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ);
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
index a7a51ef29d..4855174c76 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
@@ -122,7 +122,6 @@ public class QueryFrameworkUtils
         new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
         new NoopRequestLogger(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        authConfig,
         new DefaultQueryConfig(ImmutableMap.of()),
         new SqlLifecycleManager()
     );
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
index e51360adcd..37df6055f5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
@@ -429,7 +429,8 @@ public class SqlTestFramework
           CalciteTests.DRUID_SCHEMA_NAME,
           new CalciteRulesManager(componentSupplier.extensionCalciteRules()),
           framework.injector.getInstance(JoinableFactoryWrapper.class),
-          framework.builder.catalogResolver
+          framework.builder.catalogResolver,
+          authConfig != null ? authConfig : new AuthConfig()
       );
       componentSupplier.finalizePlanner(this);
       this.statementFactory = QueryFrameworkUtils.createSqlStatementFactory(
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index ce84cb3f4b..339e713e30 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -238,7 +238,8 @@ public class SqlResourceTest extends CalciteTestBase
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of()),
         CalciteTests.createJoinableFactoryWrapper(),
-        CatalogResolver.NULL_RESOLVER
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
     );
 
     lifecycleManager = new SqlLifecycleManager()
@@ -262,7 +263,6 @@ public class SqlResourceTest extends CalciteTestBase
         stubServiceEmitter,
         testRequestLogger,
         scheduler,
-        authConfig,
         defaultQueryConfig,
         lifecycleManager
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org