You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2021/03/23 05:15:12 UTC

[druid] branch 0.20.2 updated: Allow list for JDBC properties

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.20.2
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.20.2 by this push:
     new 48953e35 Allow list for JDBC properties
48953e35 is described below

commit 48953e3508967f5156c69676432b5d4dd25ea678
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Mar 22 22:14:31 2021 -0700

    Allow list for JDBC properties
---
 .../org/apache/druid/utils/ConnectionUriUtils.java |  63 +++
 .../java/org/apache/druid/utils/Throwables.java    |  40 ++
 .../apache/druid/utils/ConnectionUriUtilsTest.java |  90 +++++
 .../org/apache/druid/utils/ThrowablesTest.java     |  48 +++
 docs/configuration/index.md                        |  19 +
 docs/development/extensions-core/druid-lookups.md  |  16 +-
 .../extensions-core/lookups-cached-global.md       |   6 +-
 docs/development/extensions-core/mysql.md          |   1 -
 docs/ingestion/native-batch.md                     |   4 +-
 extensions-core/lookups-cached-global/pom.xml      |   9 +-
 .../lookup/namespace/JdbcExtractionNamespace.java  | 102 ++++-
 .../JdbcExtractionNamespaceUrlCheckTest.java       | 431 +++++++++++++++++++++
 .../lookup/namespace/JdbcCacheGeneratorTest.java   |   6 +-
 .../cache/JdbcExtractionNamespaceTest.java         |  25 +-
 extensions-core/lookups-cached-single/pom.xml      |   9 +-
 .../druid/server/lookup/jdbc/JdbcDataFetcher.java  | 103 ++++-
 .../server/lookup/jdbc/JdbcDataFetcherTest.java    |  19 +-
 .../lookup/jdbc/JdbcDataFetcherUrlCheckTest.java   | 409 +++++++++++++++++++
 extensions-core/mysql-metadata-storage/pom.xml     |   5 +
 .../sql/MySQLFirehoseDatabaseConnector.java        |  52 ++-
 .../sql/MySQLFirehoseDatabaseConnectorTest.java    | 234 +++++++++++
 .../PostgresqlFirehoseDatabaseConnector.java       |  32 +-
 .../storage/postgresql/PostgreSQLConnector.java    |   2 +-
 .../PostgresqlFirehoseDatabaseConnectorTest.java   | 211 ++++++++++
 .../druid/initialization/Initialization.java       |   4 +-
 .../metadata/SQLFirehoseDatabaseConnector.java     |  35 +-
 .../ExternalStorageAccessSecurityModule.java       |  44 +++
 .../initialization/JdbcAccessSecurityConfig.java   | 100 +++++
 .../druid/metadata/input/SqlInputSourceTest.java   |  21 +-
 .../apache/druid/metadata/input/SqlTestUtils.java  |  22 +-
 .../realtime/firehose/SqlFirehoseFactoryTest.java  |  22 --
 .../ExternalStorageAccessSecurityModuleTest.java   |  90 +++++
 website/.spelling                                  |   4 +
 33 files changed, 2209 insertions(+), 69 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java b/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java
new file mode 100644
index 0000000..21f47ec
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/ConnectionUriUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Set;
+
+public final class ConnectionUriUtils
+{
+  // Note: MySQL JDBC connector 8 supports 7 other protocols than just `jdbc:mysql:`
+  // (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html).
+  // We should consider either expanding recognized mysql protocols or restricting allowed protocols to
+  // just a basic one.
+  public static final String MYSQL_PREFIX = "jdbc:mysql:";
+  public static final String POSTGRES_PREFIX = "jdbc:postgresql:";
+
+  /**
+   * This method checks {@param actualProperties} against {@param allowedProperties} if they are not system properties.
+   * A property is regarded as a system property if its name starts with a prefix in {@param systemPropertyPrefixes}.
+   * See org.apache.druid.server.initialization.JDBCAccessSecurityConfig for more details.
+   *
+   * If a non-system property that is not allowed is found, this method throws an {@link IllegalArgumentException}.
+   */
+  public static void throwIfPropertiesAreNotAllowed(
+      Set<String> actualProperties,
+      Set<String> systemPropertyPrefixes,
+      Set<String> allowedProperties
+  )
+  {
+    for (String property : actualProperties) {
+      if (systemPropertyPrefixes.stream().noneMatch(property::startsWith)) {
+        Preconditions.checkArgument(
+            allowedProperties.contains(property),
+            "The property [%s] is not in the allowed list %s",
+            property,
+            allowedProperties
+        );
+      }
+    }
+  }
+
+  private ConnectionUriUtils()
+  {
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/utils/Throwables.java b/core/src/main/java/org/apache/druid/utils/Throwables.java
new file mode 100644
index 0000000..9aaad86
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/Throwables.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+public final class Throwables
+{
+  public static boolean isThrowable(Throwable t, Class<? extends Throwable> searchFor)
+  {
+    if (t.getClass().isAssignableFrom(searchFor)) {
+      return true;
+    } else {
+      if (t.getCause() != null) {
+        return isThrowable(t.getCause(), searchFor);
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private Throwables()
+  {
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java b/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java
new file mode 100644
index 0000000..f5edd9b
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/utils/ConnectionUriUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+@RunWith(Enclosed.class)
+public class ConnectionUriUtilsTest
+{
+  public static class ThrowIfURLHasNotAllowedPropertiesTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testEmptyActualProperties()
+    {
+      ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+          ImmutableSet.of(),
+          ImmutableSet.of("valid_key1", "valid_key2"),
+          ImmutableSet.of("system_key1", "system_key2")
+      );
+    }
+
+    @Test
+    public void testThrowForNonAllowedProperties()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("The property [invalid_key] is not in the allowed list [valid_key1, valid_key2]");
+
+      ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+          ImmutableSet.of("valid_key1", "invalid_key"),
+          ImmutableSet.of("system_key1", "system_key2"),
+          ImmutableSet.of("valid_key1", "valid_key2")
+      );
+    }
+
+    @Test
+    public void testAllowedProperties()
+    {
+      ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+          ImmutableSet.of("valid_key2"),
+          ImmutableSet.of("system_key1", "system_key2"),
+          ImmutableSet.of("valid_key1", "valid_key2")
+      );
+    }
+
+    @Test
+    public void testAllowSystemProperties()
+    {
+      ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+          ImmutableSet.of("system_key1", "valid_key2"),
+          ImmutableSet.of("system_key1", "system_key2"),
+          ImmutableSet.of("valid_key1", "valid_key2")
+      );
+    }
+
+    @Test
+    public void testMatchSystemProperties()
+    {
+      ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+          ImmutableSet.of("system_key1.1", "system_key1.5", "system_key11.11", "valid_key2"),
+          ImmutableSet.of("system_key1", "system_key2"),
+          ImmutableSet.of("valid_key1", "valid_key2")
+      );
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/utils/ThrowablesTest.java b/core/src/test/java/org/apache/druid/utils/ThrowablesTest.java
new file mode 100644
index 0000000..eaa34cd
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/utils/ThrowablesTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ThrowablesTest
+{
+  @Test
+  public void testIsThrowableItself()
+  {
+    Assert.assertTrue(Throwables.isThrowable(new NoClassDefFoundError(), NoClassDefFoundError.class));
+  }
+
+  @Test
+  public void testIsThrowableNestedThrowable()
+  {
+    Assert.assertTrue(
+        Throwables.isThrowable(new RuntimeException(new NoClassDefFoundError()), NoClassDefFoundError.class)
+    );
+  }
+
+  @Test
+  public void testIsThrowableNonTarget()
+  {
+    Assert.assertFalse(
+        Throwables.isThrowable(new RuntimeException(new ClassNotFoundException()), NoClassDefFoundError.class)
+    );
+  }
+}
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 756debd..7db242c 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -515,6 +515,25 @@ This deep storage is used to interface with Cassandra.  Note that the `druid-cas
 |`druid.storage.keyspace`|Cassandra key space.|none|
 
 
+### Ingestion Security Configuration
+
+#### JDBC Connections to External Databases
+
+You can use the following properties to specify permissible JDBC options for:
+- [SQL input source](../ingestion/native-batch.md#sql-input-source)
+- [SQL firehose](../ingestion/native-batch.md#sqlfirehose),
+- [globally cached JDBC lookups](../development/extensions-core/lookups-cached-global.md#jdbc-lookup)
+- [JDBC Data Fetcher for per-lookup caching](../development/extensions-core/druid-lookups.md#data-fetcher-layer).
+
+These properties do not apply to metadata storage connections.
+
+|Property|Possible Values|Description|Default|
+|--------|---------------|-----------|-------|
+|`druid.access.jdbc.enforceAllowedProperties`|Boolean|When true, Druid applies `druid.access.jdbc.allowedProperties` to JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:`. When false, Druid allows any kind of JDBC connections without JDBC property validation. This config is deprecated and will be removed in a future release.|false|
+|`druid.access.jdbc.allowedProperties`|List of JDBC properties|Defines a list of allowed JDBC properties. Druid always enforces the list for all JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:` if `druid.access.jdbc.enforceAllowedProperties` is set to true.<br/><br/>This option is tested against MySQL connector 5.1.48 and PostgreSQL connector 42.2.14. Other connector versions might not work.|["useSSL", "requireSSL", "ssl", "sslmode"]|
+|`druid.access.jdbc.allowUnknownJdbcUrlFormat`|Boolean|When false, Druid only accepts JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:`. When true, Druid allows JDBC connections to any kind of database, but only enforces `druid.access.jdbc.allowedProperties` for PostgreSQL and MySQL.|true|
+
+
 ### Task Logging
 
 If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
diff --git a/docs/development/extensions-core/druid-lookups.md b/docs/development/extensions-core/druid-lookups.md
index acede46..b9a0adf 100644
--- a/docs/development/extensions-core/druid-lookups.md
+++ b/docs/development/extensions-core/druid-lookups.md
@@ -72,7 +72,7 @@ Same for Loading cache, developer can implement a new type of loading cache by i
 
 |Field|Type|Description|Required|default|
 |-----|----|-----------|--------|-------|
-|dataFetcher|JSON object|Specifies the lookup data fetcher type  to use in order to fetch data|yes|null|
+|dataFetcher|JSON object|Specifies the lookup data fetcher type for fetching data|yes|null|
 |cacheFactory|JSON Object|Cache factory implementation|no |onHeapPolling|
 |pollPeriod|Period|polling period |no |null (poll once)|
 
@@ -129,7 +129,7 @@ Guava cache configuration spec.
    "type":"loadingLookup",
    "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
    "loadingCacheSpec":{"type":"guava"},
-   "reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterAccess":10000}
+   "reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterWrite":10000}
 }
 ```
 
@@ -150,6 +150,16 @@ Off heap cache is backed by [MapDB](http://www.mapdb.org/) implementation. MapDB
    "type":"loadingLookup",
    "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
    "loadingCacheSpec":{"type":"mapDb", "maxEntriesSize":100000},
-   "reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterAccess":10000}
+   "reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterWrite":10000}
 }
 ```
+
+### JDBC Data Fetcher
+
+|Field|Type|Description|Required|default|
+|-----|----|-----------|--------|-------|
+|`connectorConfig`|JSON object|Specifies the database connection details. You can set `connectURI`, `user` and `password`. You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../../configuration/index.md#jdbc-connections-to-external-databases) for more details.|yes||
+|`table`|string|The table name to read from.|yes||
+|`keyColumn`|string|The column name that contains the lookup key.|yes||
+|`valueColumn`|string|The column name that contains the lookup value.|yes||
+|`streamingFetchSize`|int|Fetch size used in JDBC connections.|no|1000|
diff --git a/docs/development/extensions-core/lookups-cached-global.md b/docs/development/extensions-core/lookups-cached-global.md
index 9ce6997..588bafa 100644
--- a/docs/development/extensions-core/lookups-cached-global.md
+++ b/docs/development/extensions-core/lookups-cached-global.md
@@ -64,7 +64,6 @@ Globally cached lookups can be specified as part of the [cluster wide config for
     "extractionNamespace": {
        "type": "jdbc",
        "connectorConfig": {
-         "createTables": true,
          "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
          "user": "druid",
          "password": "diurd"
@@ -107,7 +106,6 @@ In a simple case where only one [tier](../../querying/lookups.html#dynamic-confi
         "extractionNamespace": {
           "type": "jdbc",
           "connectorConfig": {
-            "createTables": true,
             "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
             "user": "druid",
             "password": "diurd"
@@ -136,7 +134,6 @@ Where the Coordinator endpoint `/druid/coordinator/v1/lookups/realtime_customer2
     "extractionNamespace": {
       "type": "jdbc",
       "connectorConfig": {
-        "createTables": true,
         "connectURI": "jdbc:mysql://localhost:3306/druid",
         "user": "druid",
         "password": "diurd"
@@ -342,7 +339,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
 |Parameter|Description|Required|Default|
 |---------|-----------|--------|-------|
 |`namespace`|The namespace to define|Yes||
-|`connectorConfig`|The connector config to use|Yes||
+|`connectorConfig`|The connector config to use. You can set `connectURI`, `user` and `password`. You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../../configuration/index.md#jdbc-connections-to-external-databases) for more details.|Yes||
 |`table`|The table which contains the key value pairs|Yes||
 |`keyColumn`|The column in `table` which contains the keys|Yes||
 |`valueColumn`|The column in `table` which contains the values|Yes||
@@ -355,7 +352,6 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
   "type":"jdbc",
   "namespace":"some_lookup",
   "connectorConfig":{
-    "createTables":true,
     "connectURI":"jdbc:mysql://localhost:3306/druid",
     "user":"druid",
     "password":"diurd"
diff --git a/docs/development/extensions-core/mysql.md b/docs/development/extensions-core/mysql.md
index f39e93c..8b61601 100644
--- a/docs/development/extensions-core/mysql.md
+++ b/docs/development/extensions-core/mysql.md
@@ -105,7 +105,6 @@ Copy or symlink this file to `extensions/mysql-metadata-storage` under the distr
 |`druid.metadata.mysql.ssl.enabledSSLCipherSuites`|Overrides the existing cipher suites with these cipher suites.|none|no|
 |`druid.metadata.mysql.ssl.enabledTLSProtocols`|Overrides the TLS protocols with these protocols.|none|no|
 
-
 ### MySQL Firehose
 
 The MySQL extension provides an implementation of an [SqlFirehose](../../ingestion/native-batch.md#firehoses-deprecated) which can be used to ingest data into Druid from a MySQL database.
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index e1d2930..8f855d7 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1346,7 +1346,7 @@ Please refer to the Recommended practices section below before using this input
 |property|description|required?|
 |--------|-----------|---------|
 |type|This should be "sql".|Yes|
-|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support and this extension must be loaded into Druid. For database types `mysql` and `postgresql`, the `connectorConfig` support is provided by [mysql-metadata-storage](../development/extensions-core/mysql.md) and [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extensions respectively.|Yes|
+|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC  [...]
 |foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No|
 |sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes|
 
@@ -1692,7 +1692,7 @@ Requires one of the following extensions:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should be "sql".||Yes|
-|database|Specifies the database connection details.||Yes|
+|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC  [...]
 |maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
 |maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
 |prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
diff --git a/extensions-core/lookups-cached-global/pom.xml b/extensions-core/lookups-cached-global/pom.xml
index 1b5635a..562704b 100644
--- a/extensions-core/lookups-cached-global/pom.xml
+++ b/extensions-core/lookups-cached-global/pom.xml
@@ -110,12 +110,15 @@
       <artifactId>jsr311-api</artifactId>
       <scope>provided</scope>
     </dependency>
-
-    <!-- Included to improve the out-of-the-box experience for supported JDBC connectors -->
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql.version}</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.postgresql</groupId>
       <artifactId>postgresql</artifactId>
-      <scope>runtime</scope>
     </dependency>
 
     <!-- Tests -->
diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
index ce4eddf..932191c 100644
--- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
+++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java
@@ -19,17 +19,28 @@
 
 package org.apache.druid.query.lookup.namespace;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.mysql.jdbc.NonRegisteringDriver;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.apache.druid.utils.ConnectionUriUtils;
+import org.apache.druid.utils.Throwables;
 import org.joda.time.Period;
+import org.postgresql.Driver;
 
 import javax.annotation.Nullable;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
+import java.sql.SQLException;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 
 /**
  *
@@ -61,11 +72,15 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
       @NotNull @JsonProperty(value = "valueColumn", required = true) final String valueColumn,
       @JsonProperty(value = "tsColumn", required = false) @Nullable final String tsColumn,
       @JsonProperty(value = "filter", required = false) @Nullable final String filter,
-      @Min(0) @JsonProperty(value = "pollPeriod", required = false) @Nullable final Period pollPeriod
+      @Min(0) @JsonProperty(value = "pollPeriod", required = false) @Nullable final Period pollPeriod,
+      @JacksonInject JdbcAccessSecurityConfig securityConfig
   )
   {
     this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
-    Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
+    // Check the properties in the connection URL. Note that JdbcExtractionNamespace doesn't use
+    // MetadataStorageConnectorConfig.getDbcpProperties(). If we want to use them,
+    // those DBCP properties should be validated using the same logic.
+    checkConnectionURL(connectorConfig.getConnectURI(), securityConfig);
     this.table = Preconditions.checkNotNull(table, "table");
     this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
     this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
@@ -74,6 +89,89 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
     this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
   }
 
+  /**
+   * Check the given URL whether it contains non-allowed properties.
+   *
+   * This method should be in sync with the following methods:
+   *
+   * - {@code org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()}
+   * - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
+   * - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
+   *
+   * @see JdbcAccessSecurityConfig#getAllowedProperties()
+   */
+  private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
+  {
+    Preconditions.checkNotNull(url, "connectorConfig.connectURI");
+
+    if (!securityConfig.isEnforceAllowedProperties()) {
+      // You don't want to do anything with properties.
+      return;
+    }
+
+    @Nullable final Properties properties; // null when url has an invalid format
+
+    if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
+      try {
+        NonRegisteringDriver driver = new NonRegisteringDriver();
+        properties = driver.parseURL(url, null);
+      }
+      catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+      catch (Throwable e) {
+        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
+            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
+          if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
+            throw new RuntimeException(
+                "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
+                e
+            );
+          }
+        }
+        throw new RuntimeException(e);
+      }
+    } else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
+      try {
+        properties = Driver.parseURL(url, null);
+      }
+      catch (Throwable e) {
+        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
+            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
+          if (e.getMessage().contains("org/postgresql/Driver")) {
+            throw new RuntimeException(
+                "Failed to find PostgreSQL driver class. "
+                + "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
+                e
+            );
+          }
+        }
+        throw new RuntimeException(e);
+      }
+    } else {
+      if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
+        properties = new Properties();
+      } else {
+        // unknown format but it is not allowed
+        throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
+      }
+    }
+
+    if (properties == null) {
+      // There is something wrong with the URL format.
+      throw new IAE("Invalid URL format [%s]", url);
+    }
+
+    final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> propertyKeys.add((String) k));
+
+    ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+        propertyKeys,
+        securityConfig.getSystemPropertyPrefixes(),
+        securityConfig.getAllowedProperties()
+    );
+  }
+
   public MetadataStorageConnectorConfig getConnectorConfig()
   {
     return connectorConfig;
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
new file mode 100644
index 0000000..03141dd
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup.namespace;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.joda.time.Period;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.util.Set;
+
+@RunWith(Enclosed.class)
+public class JdbcExtractionNamespaceUrlCheckTest
+{
+  private static final String TABLE_NAME = "abstractDbRenameTest";
+  private static final String KEY_NAME = "keyName";
+  private static final String VAL_NAME = "valName";
+  private static final String TS_COLUMN = "tsColumn";
+
+  public static class MySqlTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
+    {
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testThrowWhenUrlHasNonAllowedPropertiesWhenEnforcingAllowedProperties()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenUrlHasNonAllowedPropertiesWhenNotEnforcingAllowedProperties()
+    {
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return false;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenInvalidUrlFormat()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql:/invalid-url::3006";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+
+  public static class PostgreSqlTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
+    {
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testThrowWhenUrlHasNonAllowedPropertiesWhenEnforcingAllowedProperties()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenUrlHasNonAllowedPropertiesWhenNotEnforcingAllowedProperties()
+    {
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return false;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenInvalidUrlFormat()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://invalid-url::3006";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+
+  public static class UnknownSchemeTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testThrowWhenUnknownFormatIsNotAllowed()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Unknown JDBC connection scheme: mydb");
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isAllowUnknownJdbcUrlFormat()
+            {
+              return false;
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testSkipUrlParsingWhenUnknownFormatIsAllowed()
+    {
+      new JdbcExtractionNamespace(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_NAME,
+          VAL_NAME,
+          TS_COLUMN,
+          "some filter",
+          new Period(10),
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isAllowUnknownJdbcUrlFormat()
+            {
+              return true;
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+}
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
index 94cba98..169061b 100644
--- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
 import org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -42,7 +43,7 @@ import java.util.Collections;
 public class JdbcCacheGeneratorTest
 {
   private static final MetadataStorageConnectorConfig MISSING_METADATA_STORAGE_CONNECTOR_CONFIG =
-      createMetadataStorageConnectorConfig("postgresql");
+      createMetadataStorageConnectorConfig("mydb");
 
   private static final CacheScheduler.EntryImpl<JdbcExtractionNamespace> KEY =
       EasyMock.mock(CacheScheduler.EntryImpl.class);
@@ -127,7 +128,8 @@ public class JdbcCacheGeneratorTest
         "valueColumn",
         tsColumn,
         "filter",
-        Period.ZERO
+        Period.ZERO,
+        new JdbcAccessSecurityConfig()
     );
   }
 }
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
index 84086b8..bb53711 100644
--- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.druid.server.lookup.namespace.cache;
 
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
@@ -34,7 +37,7 @@ import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.lookup.namespace.CacheGenerator;
 import org.apache.druid.query.lookup.namespace.ExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
-import org.apache.druid.server.ServerTestHelper;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -73,6 +76,7 @@ public class JdbcExtractionNamespaceTest
 
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
   private static final Logger log = new Logger(JdbcExtractionNamespaceTest.class);
   private static final String TABLE_NAME = "abstractDbRenameTest";
   private static final String KEY_NAME = "keyName";
@@ -376,7 +380,8 @@ public class JdbcExtractionNamespaceTest
         VAL_NAME,
         tsColumn,
         null,
-        new Period(0)
+        new Period(0),
+        new JdbcAccessSecurityConfig()
     );
     try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
       CacheSchedulerTest.waitFor(entry);
@@ -407,7 +412,8 @@ public class JdbcExtractionNamespaceTest
         VAL_NAME,
         tsColumn,
         FILTER_COLUMN + "='1'",
-        new Period(0)
+        new Period(0),
+        new JdbcAccessSecurityConfig()
     );
     try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
       CacheSchedulerTest.waitFor(entry);
@@ -472,6 +478,7 @@ public class JdbcExtractionNamespaceTest
   @Test
   public void testSerde() throws IOException
   {
+    final JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig();
     final JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
         derbyConnectorRule.getMetadataConnectorConfig(),
         TABLE_NAME,
@@ -479,11 +486,14 @@ public class JdbcExtractionNamespaceTest
         VAL_NAME,
         tsColumn,
         "some filter",
-        new Period(10)
+        new Period(10),
+        securityConfig
     );
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(new Std().addValue(JdbcAccessSecurityConfig.class, securityConfig));
 
-    final ExtractionNamespace extractionNamespace2 = ServerTestHelper.MAPPER.readValue(
-        ServerTestHelper.MAPPER.writeValueAsBytes(extractionNamespace),
+    final ExtractionNamespace extractionNamespace2 = mapper.readValue(
+        mapper.writeValueAsBytes(extractionNamespace),
         ExtractionNamespace.class
     );
 
@@ -500,7 +510,8 @@ public class JdbcExtractionNamespaceTest
         VAL_NAME,
         tsColumn,
         null,
-        new Period(10)
+        new Period(10),
+        new JdbcAccessSecurityConfig()
     );
     CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
 
diff --git a/extensions-core/lookups-cached-single/pom.xml b/extensions-core/lookups-cached-single/pom.xml
index 9a8d210..28d9cd4 100644
--- a/extensions-core/lookups-cached-single/pom.xml
+++ b/extensions-core/lookups-cached-single/pom.xml
@@ -96,12 +96,15 @@
       <artifactId>guava</artifactId>
       <scope>provided</scope>
     </dependency>
-
-    <!-- Included to improve the out-of-the-box experience for supported JDBC connectors -->
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql.version}</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.postgresql</groupId>
       <artifactId>postgresql</artifactId>
-      <scope>runtime</scope>
     </dependency>
 
     <!-- Tests -->
diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
index 6336de5..ddcb5c6 100644
--- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
+++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcher.java
@@ -19,15 +19,23 @@
 
 package org.apache.druid.server.lookup.jdbc;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.mysql.jdbc.NonRegisteringDriver;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.DataFetcher;
+import org.apache.druid.utils.ConnectionUriUtils;
+import org.apache.druid.utils.Throwables;
+import org.postgresql.Driver;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
@@ -39,6 +47,8 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 import java.util.function.Supplier;
 
 public class JdbcDataFetcher implements DataFetcher<String, String>
@@ -71,12 +81,16 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
       @JsonProperty("table") String table,
       @JsonProperty("keyColumn") String keyColumn,
       @JsonProperty("valueColumn") String valueColumn,
-      @JsonProperty("streamingFetchSize") @Nullable Integer streamingFetchSize
+      @JsonProperty("streamingFetchSize") @Nullable Integer streamingFetchSize,
+      @JacksonInject JdbcAccessSecurityConfig securityConfig
   )
   {
     this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
     this.streamingFetchSize = streamingFetchSize == null ? DEFAULT_STREAMING_FETCH_SIZE : streamingFetchSize;
-    Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
+    // Check the properties in the connection URL. Note that JdbcDataFetcher doesn't use
+    // MetadataStorageConnectorConfig.getDbcpProperties(). If we want to use them,
+    // those DBCP properties should be validated using the same logic.
+    checkConnectionURL(connectorConfig.getConnectURI(), securityConfig);
     this.table = Preconditions.checkNotNull(table, "table");
     this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
     this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
@@ -107,6 +121,89 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
     dbi.registerMapper(new KeyValueResultSetMapper(keyColumn, valueColumn));
   }
 
+  /**
+   * Check the given URL whether it contains non-allowed properties.
+   *
+   * This method should be in sync with the following methods:
+   *
+   * - {@code org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()}
+   * - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
+   * - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
+   *
+   * @see JdbcAccessSecurityConfig#getAllowedProperties()
+   */
+  private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
+  {
+    Preconditions.checkNotNull(url, "connectorConfig.connectURI");
+
+    if (!securityConfig.isEnforceAllowedProperties()) {
+      // You don't want to do anything with properties.
+      return;
+    }
+
+    @Nullable final Properties properties;
+
+    if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
+      try {
+        NonRegisteringDriver driver = new NonRegisteringDriver();
+        properties = driver.parseURL(url, null);
+      }
+      catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+      catch (Throwable e) {
+        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
+            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
+          if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
+            throw new RuntimeException(
+                "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
+                e
+            );
+          }
+        }
+        throw new RuntimeException(e);
+      }
+    } else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
+      try {
+        properties = Driver.parseURL(url, null);
+      }
+      catch (Throwable e) {
+        if (Throwables.isThrowable(e, NoClassDefFoundError.class)
+            || Throwables.isThrowable(e, ClassNotFoundException.class)) {
+          if (e.getMessage().contains("org/postgresql/Driver")) {
+            throw new RuntimeException(
+                "Failed to find PostgreSQL driver class. "
+                + "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
+                e
+            );
+          }
+        }
+        throw new RuntimeException(e);
+      }
+    } else {
+      if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
+        properties = new Properties();
+      } else {
+        // unknown format but it is not allowed
+        throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
+      }
+    }
+
+    if (properties == null) {
+      // There is something wrong with the URL format.
+      throw new IAE("Invalid URL format [%s]", url);
+    }
+
+    final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> propertyKeys.add((String) k));
+
+    ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+        propertyKeys,
+        securityConfig.getSystemPropertyPrefixes(),
+        securityConfig.getAllowedProperties()
+    );
+  }
+
   @Override
   public Iterable<Map.Entry<String, String>> fetchAll()
   {
@@ -232,7 +329,7 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
       if (e.getMessage().contains("No suitable driver found")) {
         throw new ISE(
             e,
-            "JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory"
+            "JDBC driver JAR files missing in the classpath"
         );
       } else {
         throw e;
diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherTest.java
index 9cb3630..1139d4c 100644
--- a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherTest.java
+++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.lookup.jdbc;
 
+import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
@@ -26,6 +27,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.apache.druid.server.lookup.DataFetcher;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.After;
@@ -74,7 +76,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
           "tableName",
           "keyColumn",
           "valueColumn",
-          100
+          100,
+          new JdbcAccessSecurityConfig()
       );
 
       handle = derbyConnectorRule.getConnector().getDBI().open();
@@ -155,14 +158,17 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
     @Test
     public void testSerDesr() throws IOException
     {
+      final JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig();
       JdbcDataFetcher jdbcDataFetcher = new JdbcDataFetcher(
           new MetadataStorageConnectorConfig(),
           "table",
           "keyColumn",
           "ValueColumn",
-          100
+          100,
+          securityConfig
       );
       DefaultObjectMapper mapper = new DefaultObjectMapper();
+      mapper.setInjectableValues(new Std().addValue(JdbcAccessSecurityConfig.class, securityConfig));
       String jdbcDataFetcherSer = mapper.writeValueAsString(jdbcDataFetcher);
       Assert.assertEquals(jdbcDataFetcher, mapper.readerFor(DataFetcher.class).readValue(jdbcDataFetcherSer));
     }
@@ -213,7 +219,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
           TABLE_NAME,
           KEY_COLUMN,
           VALUE_COLUMN,
-          null
+          null,
+          new JdbcAccessSecurityConfig()
       );
     }
 
@@ -244,7 +251,7 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
     private void test(Runnable runnable)
     {
       exception.expect(IllegalStateException.class);
-      exception.expectMessage("JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory");
+      exception.expectMessage("JDBC driver JAR files missing in the classpath");
 
       runnable.run();
     }
@@ -252,8 +259,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
     @SuppressWarnings("SameParameterValue")
     private static MetadataStorageConnectorConfig createMissingMetadataStorageConnectorConfig()
     {
-      String type = "postgresql";
-      String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:5432\"}";
+      String type = "mydb";
+      String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:3306/\"}";
       try {
         return new ObjectMapper().readValue(json, MetadataStorageConnectorConfig.class);
       }
diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java
new file mode 100644
index 0000000..1dad8e9
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/jdbc/JdbcDataFetcherUrlCheckTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.lookup.jdbc;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.util.Set;
+
+@RunWith(Enclosed.class)
+public class JdbcDataFetcherUrlCheckTest
+{
+  private static final String TABLE_NAME = "tableName";
+  private static final String KEY_COLUMN = "keyColumn";
+  private static final String VALUE_COLUMN = "valueColumn";
+
+  public static class MySqlTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
+    {
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testThrowWhenUrlHasDisallowedPropertiesWhenEnforcingAllowedProperties()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenUrlHasDisallowedPropertiesWhenNotEnforcingAllowedProperties()
+    {
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return false;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenInvalidUrlFormat()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mysql:/invalid-url::3006";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+
+  public static class PostgreSqlTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
+    {
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testThrowWhenUrlHasDisallowedPropertiesWhenEnforcingAllowedProperties()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenUrlHasDisallowedPropertiesWhenNotEnforcingAllowedProperties()
+    {
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return false;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testWhenInvalidUrlFormat()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:postgresql://invalid-url::3006";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+
+  public static class UnknownSchemeTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testThrowWhenUnknownFormatIsNotAllowed()
+    {
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Unknown JDBC connection scheme: mydb");
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isAllowUnknownJdbcUrlFormat()
+            {
+              return false;
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+
+    @Test
+    public void testSkipUrlParsingWhenUnknownFormatIsAllowed()
+    {
+      new JdbcDataFetcher(
+          new MetadataStorageConnectorConfig()
+          {
+            @Override
+            public String getConnectURI()
+            {
+              return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
+            }
+          },
+          TABLE_NAME,
+          KEY_COLUMN,
+          VALUE_COLUMN,
+          100,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("valid_key1", "valid_key2");
+            }
+
+            @Override
+            public boolean isAllowUnknownJdbcUrlFormat()
+            {
+              return true;
+            }
+
+            @Override
+            public boolean isEnforceAllowedProperties()
+            {
+              return true;
+            }
+          }
+      );
+    }
+  }
+}
diff --git a/extensions-core/mysql-metadata-storage/pom.xml b/extensions-core/mysql-metadata-storage/pom.xml
index f78c49c..f5ce2f8 100644
--- a/extensions-core/mysql-metadata-storage/pom.xml
+++ b/extensions-core/mysql-metadata-storage/pom.xml
@@ -88,6 +88,11 @@
             <artifactId>commons-dbcp2</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
index 2c08a63..5b622a6 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
@@ -19,13 +19,24 @@
 
 package org.apache.druid.firehose.sql;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Sets;
+import com.mysql.jdbc.NonRegisteringDriver;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.apache.druid.utils.Throwables;
 import org.skife.jdbi.v2.DBI;
 
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.Set;
+
 
 @JsonTypeName("mysql")
 public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
@@ -33,12 +44,14 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
   private final DBI dbi;
   private final MetadataStorageConnectorConfig connectorConfig;
 
+  @JsonCreator
   public MySQLFirehoseDatabaseConnector(
-      @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
+      @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
+      @JacksonInject JdbcAccessSecurityConfig securityConfig
   )
   {
     this.connectorConfig = connectorConfig;
-    final BasicDataSource datasource = getDatasource(connectorConfig);
+    final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
     datasource.setDriverClassLoader(getClass().getClassLoader());
     datasource.setDriverClassName("com.mysql.jdbc.Driver");
     this.dbi = new DBI(datasource);
@@ -55,4 +68,39 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
   {
     return dbi;
   }
+
+  @Override
+  public Set<String> findPropertyKeysFromConnectURL(String connectUrl)
+  {
+    // This method should be in sync with
+    // - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
+    // - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
+    Properties properties;
+    try {
+      NonRegisteringDriver driver = new NonRegisteringDriver();
+      properties = driver.parseURL(connectUrl, null);
+    }
+    catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    catch (Throwable e) {
+      if (Throwables.isThrowable(e, NoClassDefFoundError.class)
+          || Throwables.isThrowable(e, ClassNotFoundException.class)) {
+        if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
+          throw new RuntimeException(
+              "Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
+              e
+          );
+        }
+      }
+      throw new RuntimeException(e);
+    }
+
+    if (properties == null) {
+      throw new IAE("Invalid URL format for MySQL: [%s]", connectUrl);
+    }
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> keys.add((String) k));
+    return keys;
+  }
 }
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
new file mode 100644
index 0000000..4778a42
--- /dev/null
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.firehose.sql;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Set;
+
+public class MySQLFirehoseDatabaseConnectorTest
+{
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testSuccessWhenAllowlistAndNoProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailWhenNoAllowlistAndHaveProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of(""));
+
+    expectedException.expectMessage("The property [password] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testSuccessOnlyValidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
+        ImmutableSet.of("user", "password", "keyonly", "etc")
+    );
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailOnlyInvalidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("none", "nonenone"));
+
+    expectedException.expectMessage("The property [password] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailValidAndInvalidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
+
+    expectedException.expectMessage("The property [password] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testIgnoreInvalidPropertyWhenNotEnforcingAllowList()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig()
+    {
+      @Override
+      public Set<String> getAllowedProperties()
+      {
+        return ImmutableSet.of("user", "nonenone");
+      }
+    };
+
+    new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFindPropertyKeysFromInvalidConnectUrl()
+  {
+    final String url = "jdbc:mysql:/invalid-url::3006";
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return url;
+      }
+    };
+
+    MySQLFirehoseDatabaseConnector connector = new MySQLFirehoseDatabaseConnector(
+        connectorConfig,
+        new JdbcAccessSecurityConfig()
+    );
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(StringUtils.format("Invalid URL format for MySQL: [%s]", url));
+    connector.findPropertyKeysFromConnectURL(url);
+  }
+
+  private static JdbcAccessSecurityConfig newSecurityConfigEnforcingAllowList(Set<String> allowedProperties)
+  {
+    return new JdbcAccessSecurityConfig()
+    {
+      @Override
+      public Set<String> getAllowedProperties()
+      {
+        return allowedProperties;
+      }
+
+      @Override
+      public boolean isEnforceAllowedProperties()
+      {
+        return true;
+      }
+    };
+  }
+}
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
index e40d444..bcf3e58 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
@@ -19,13 +19,22 @@
 
 package org.apache.druid.firehose;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Sets;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.postgresql.Driver;
 import org.skife.jdbi.v2.DBI;
 
+import java.util.Properties;
+import java.util.Set;
+
 
 @JsonTypeName("postgresql")
 public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
@@ -33,12 +42,14 @@ public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConn
   private final DBI dbi;
   private final MetadataStorageConnectorConfig connectorConfig;
 
+  @JsonCreator
   public PostgresqlFirehoseDatabaseConnector(
-      @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
+      @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
+      @JacksonInject JdbcAccessSecurityConfig securityConfig
   )
   {
     this.connectorConfig = connectorConfig;
-    final BasicDataSource datasource = getDatasource(connectorConfig);
+    final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
     datasource.setDriverClassLoader(getClass().getClassLoader());
     datasource.setDriverClassName("org.postgresql.Driver");
     this.dbi = new DBI(datasource);
@@ -55,4 +66,21 @@ public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConn
   {
     return dbi;
   }
+
+  @Override
+  public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+  {
+    // This method should be in sync with
+    // - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
+    // - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
+
+    // Postgresql JDBC driver is embedded and thus must be loaded.
+    Properties properties = Driver.parseURL(connectUri, null);
+    if (properties == null) {
+      throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectUri);
+    }
+    Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
+    properties.forEach((k, v) -> keys.add((String) k));
+    return keys;
+  }
 }
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
index 0c5c4d0..7312c6e 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
@@ -57,7 +57,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
   private volatile Boolean canUpsert;
 
   private final String dbTableSchema;
-  
+
   @Inject
   public PostgreSQLConnector(
       Supplier<MetadataStorageConnectorConfig> config,
diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
new file mode 100644
index 0000000..229f1a4
--- /dev/null
+++ b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.firehose;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Set;
+
+public class PostgresqlFirehoseDatabaseConnectorTest
+{
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testSuccessWhenAllowlistAndNoProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailWhenNoAllowlistAndHaveProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of(""));
+
+    expectedException.expectMessage("The property [keyonly] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testSuccessOnlyValidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
+        ImmutableSet.of("user", "password", "keyonly", "etc")
+    );
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailOnlyInvalidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("none", "nonenone"));
+
+    expectedException.expectMessage("The property [keyonly] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testFailValidAndInvalidProperty()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
+
+    expectedException.expectMessage("The property [keyonly] is not in the allowed list");
+    expectedException.expect(IllegalArgumentException.class);
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  @Test
+  public void testIgnoreInvalidPropertyWhenNotEnforcingAllowList()
+  {
+    MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
+    {
+      @Override
+      public String getConnectURI()
+      {
+        return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
+      }
+    };
+
+    JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig()
+    {
+      @Override
+      public Set<String> getAllowedProperties()
+      {
+        return ImmutableSet.of("user", "nonenone");
+      }
+    };
+
+    new PostgresqlFirehoseDatabaseConnector(
+        connectorConfig,
+        securityConfig
+    );
+  }
+
+  private static JdbcAccessSecurityConfig newSecurityConfigEnforcingAllowList(Set<String> allowedProperties)
+  {
+    return new JdbcAccessSecurityConfig()
+    {
+      @Override
+      public Set<String> getAllowedProperties()
+      {
+        return allowedProperties;
+      }
+
+      @Override
+      public boolean isEnforceAllowedProperties()
+      {
+        return true;
+      }
+    };
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/initialization/Initialization.java b/server/src/main/java/org/apache/druid/initialization/Initialization.java
index 8b417db..2279fda 100644
--- a/server/src/main/java/org/apache/druid/initialization/Initialization.java
+++ b/server/src/main/java/org/apache/druid/initialization/Initialization.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumModule;
 import org.apache.druid.server.emitter.EmitterModule;
 import org.apache.druid.server.initialization.AuthenticatorMapperModule;
 import org.apache.druid.server.initialization.AuthorizerMapperModule;
+import org.apache.druid.server.initialization.ExternalStorageAccessSecurityModule;
 import org.apache.druid.server.initialization.jetty.JettyServerModule;
 import org.apache.druid.server.metrics.MetricsModule;
 import org.apache.druid.server.security.TLSCertificateCheckerModule;
@@ -411,7 +412,8 @@ public class Initialization
         new EscalatorModule(),
         new AuthorizerModule(),
         new AuthorizerMapperModule(),
-        new StartupLoggingModule()
+        new StartupLoggingModule(),
+        new ExternalStorageAccessSecurityModule()
     );
 
     ModuleList actualModules = new ModuleList(baseInjector);
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
index 242f04b..4ee06c8 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
@@ -21,8 +21,11 @@ package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.apache.druid.utils.ConnectionUriUtils;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.exceptions.DBIException;
 import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
@@ -32,6 +35,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
 import java.sql.SQLException;
 import java.sql.SQLRecoverableException;
 import java.sql.SQLTransientException;
+import java.util.Set;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public abstract class SQLFirehoseDatabaseConnector
@@ -62,8 +66,15 @@ public abstract class SQLFirehoseDatabaseConnector
                          || (e instanceof DBIException && isTransientException(e.getCause())));
   }
 
-  protected BasicDataSource getDatasource(MetadataStorageConnectorConfig connectorConfig)
+  protected BasicDataSource getDatasource(
+      MetadataStorageConnectorConfig connectorConfig,
+      JdbcAccessSecurityConfig securityConfig
+  )
   {
+    // We validate only the connection URL here as all properties will be read from only the URL except
+    // users and password. If we want to allow another way to specify user properties such as using
+    // MetadataStorageConnectorConfig.getDbcpProperties(), those properties should be validated as well.
+    validateConfigs(connectorConfig.getConnectURI(), securityConfig);
     BasicDataSource dataSource = new BasicDataSource();
     dataSource.setUsername(connectorConfig.getUser());
     dataSource.setPassword(connectorConfig.getPassword());
@@ -75,6 +86,23 @@ public abstract class SQLFirehoseDatabaseConnector
     return dataSource;
   }
 
+  private void validateConfigs(String urlString, JdbcAccessSecurityConfig securityConfig)
+  {
+    if (Strings.isNullOrEmpty(urlString)) {
+      throw new IllegalArgumentException("connectURI cannot be null or empty");
+    }
+    if (!securityConfig.isEnforceAllowedProperties()) {
+      // You don't want to do anything with properties.
+      return;
+    }
+    final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString);
+    ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
+        propertyKeyFromConnectURL,
+        securityConfig.getSystemPropertyPrefixes(),
+        securityConfig.getAllowedProperties()
+    );
+  }
+
   public String getValidationQuery()
   {
     return "SELECT 1";
@@ -82,5 +110,8 @@ public abstract class SQLFirehoseDatabaseConnector
 
   public abstract DBI getDBI();
 
-
+  /**
+   * Extract property keys from the given JDBC URL.
+   */
+  public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri);
 }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModule.java b/server/src/main/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModule.java
new file mode 100644
index 0000000..eda21ed
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.initialization;
+
+import com.fasterxml.jackson.databind.Module;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class ExternalStorageAccessSecurityModule implements DruidModule
+{
+
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, "druid.access.jdbc", JdbcAccessSecurityConfig.class);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/initialization/JdbcAccessSecurityConfig.java b/server/src/main/java/org/apache/druid/server/initialization/JdbcAccessSecurityConfig.java
new file mode 100644
index 0000000..ba12ec0
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/initialization/JdbcAccessSecurityConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.initialization;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * A config class that applies to all JDBC connections to other databases.
+ *
+ * @see org.apache.druid.utils.ConnectionUriUtils
+ */
+public class JdbcAccessSecurityConfig
+{
+  static final Set<String> DEFAULT_ALLOWED_PROPERTIES = ImmutableSet.of(
+      // MySQL
+      "useSSL",
+      "requireSSL",
+
+      // PostgreSQL
+      "ssl",
+      "sslmode"
+  );
+
+  /**
+   * Prefixes of the properties that can be added automatically by {@link java.sql.Driver} during
+   * connection URL parsing. Any properties resulted by connection URL parsing are regarded as
+   * system properties if they start with the prefixes in this set.
+   * Only these non-system properties are checkaed against {@link #getAllowedProperties()}.
+   */
+  private static final Set<String> SYSTEM_PROPERTY_PREFIXES = ImmutableSet.of(
+      // MySQL
+      // There can be multiple host and port properties if multiple addresses are specified.
+      // The pattern of the property name is HOST.i and PORT.i where i is an integer.
+      "HOST",
+      "PORT",
+      "NUM_HOSTS",
+      "DBNAME",
+
+      // PostgreSQL
+      "PGHOST",
+      "PGPORT",
+      "PGDBNAME"
+  );
+
+  @JsonProperty
+  private Set<String> allowedProperties = DEFAULT_ALLOWED_PROPERTIES;
+
+  @JsonProperty
+  private boolean allowUnknownJdbcUrlFormat = true;
+
+  // Enforcing allow list check can break rolling upgrade. This is not good for patch releases
+  // and is why this config is added. However, from the security point of view, this config
+  // should be always enabled in production to secure your cluster. As a result, this config
+  // is deprecated and will be removed in the near future.
+  @Deprecated
+  @JsonProperty
+  private boolean enforceAllowedProperties = false;
+
+  @JsonIgnore
+  public Set<String> getSystemPropertyPrefixes()
+  {
+    return SYSTEM_PROPERTY_PREFIXES;
+  }
+
+  public Set<String> getAllowedProperties()
+  {
+    return allowedProperties;
+  }
+
+  public boolean isAllowUnknownJdbcUrlFormat()
+  {
+    return allowUnknownJdbcUrlFormat;
+  }
+
+  public boolean isEnforceAllowedProperties()
+  {
+    return enforceAllowedProperties;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
index 7afa888..3360346 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.io.FileUtils;
@@ -41,6 +42,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.easymock.EasyMock;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -57,6 +59,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -221,7 +224,17 @@ public class SqlInputSourceTest
         @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig
     )
     {
-      final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
+      final BasicDataSource datasource = getDatasource(
+          metadataStorageConnectorConfig,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("user", "create");
+            }
+          }
+      );
       datasource.setDriverClassLoader(getClass().getClassLoader());
       datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
       this.dbi = new DBI(datasource);
@@ -258,5 +271,11 @@ public class SqlInputSourceTest
     {
       return dbi;
     }
+
+    @Override
+    public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+    {
+      return ImmutableSet.of("user", "create");
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
index 60e7c73..c21b5cc 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlTestUtils.java
@@ -21,16 +21,20 @@ package org.apache.druid.metadata.input;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
 import org.junit.Rule;
 import org.skife.jdbi.v2.Batch;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 
+import java.util.Set;
+
 public class SqlTestUtils
 {
   @Rule
@@ -55,7 +59,17 @@ public class SqlTestUtils
         @JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi
     )
     {
-      final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
+      final BasicDataSource datasource = getDatasource(
+          metadataStorageConnectorConfig,
+          new JdbcAccessSecurityConfig()
+          {
+            @Override
+            public Set<String> getAllowedProperties()
+            {
+              return ImmutableSet.of("user", "create");
+            }
+          }
+      );
       datasource.setDriverClassLoader(getClass().getClassLoader());
       datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
       this.dbi = dbi;
@@ -66,6 +80,12 @@ public class SqlTestUtils
     {
       return dbi;
     }
+
+    @Override
+    public Set<String> findPropertyKeysFromConnectURL(String connectUri)
+    {
+      return ImmutableSet.of("user", "create");
+    }
   }
 
   public void createAndUpdateTable(final String tableName, int numEntries)
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java
index 189aa49..0f3b0c5 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactoryTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.realtime.firehose;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
-import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.Row;
@@ -31,8 +30,6 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.metadata.MetadataStorageConnectorConfig;
-import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.metadata.input.SqlTestUtils;
 import org.apache.druid.segment.TestHelper;
@@ -42,7 +39,6 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.skife.jdbi.v2.DBI;
 
 import java.io.File;
 import java.io.IOException;
@@ -236,22 +232,4 @@ public class SqlFirehoseFactoryTest
     testUtils.dropTable(TABLE_NAME_2);
 
   }
-  private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector
-  {
-    private final DBI dbi;
-
-    private TestDerbyFirehoseConnector(MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi)
-    {
-      final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
-      datasource.setDriverClassLoader(getClass().getClassLoader());
-      datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
-      this.dbi = dbi;
-    }
-
-    @Override
-    public DBI getDBI()
-    {
-      return dbi;
-    }
-  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModuleTest.java b/server/src/test/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModuleTest.java
new file mode 100644
index 0000000..7f092dd
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/initialization/ExternalStorageAccessSecurityModuleTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.initialization;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.Properties;
+
+public class ExternalStorageAccessSecurityModuleTest
+{
+  @Test
+  public void testSecurityConfigDefault()
+  {
+    JdbcAccessSecurityConfig securityConfig = makeInjectorWithProperties(new Properties()).getInstance(
+        JdbcAccessSecurityConfig.class
+    );
+    Assert.assertNotNull(securityConfig);
+    Assert.assertEquals(
+        JdbcAccessSecurityConfig.DEFAULT_ALLOWED_PROPERTIES,
+        securityConfig.getAllowedProperties()
+    );
+    Assert.assertTrue(securityConfig.isAllowUnknownJdbcUrlFormat());
+    Assert.assertFalse(securityConfig.isEnforceAllowedProperties());
+  }
+
+  @Test
+  public void testSecurityConfigOverride()
+  {
+    Properties properties = new Properties();
+    properties.setProperty("druid.access.jdbc.allowedProperties", "[\"valid1\", \"valid2\", \"valid3\"]");
+    properties.setProperty("druid.access.jdbc.allowUnknownJdbcUrlFormat", "false");
+    properties.setProperty("druid.access.jdbc.enforceAllowedProperties", "true");
+    JdbcAccessSecurityConfig securityConfig = makeInjectorWithProperties(properties).getInstance(
+        JdbcAccessSecurityConfig.class
+    );
+    Assert.assertNotNull(securityConfig);
+    Assert.assertEquals(
+        ImmutableSet.of(
+            "valid1",
+            "valid2",
+            "valid3"
+        ),
+        securityConfig.getAllowedProperties()
+    );
+    Assert.assertFalse(securityConfig.isAllowUnknownJdbcUrlFormat());
+    Assert.assertTrue(securityConfig.isEnforceAllowedProperties());
+  }
+
+  private static Injector makeInjectorWithProperties(final Properties props)
+  {
+    return Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            binder -> {
+              binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+              binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+              binder.bind(Properties.class).toInstance(props);
+            },
+            new ExternalStorageAccessSecurityModule()
+        )
+    );
+  }
+}
diff --git a/website/.spelling b/website/.spelling
index 5a4453e..d0f2686 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -356,6 +356,7 @@ reingest
 reingesting
 reingestion
 repo
+requireSSL
 rollup
 rollups
 rsync
@@ -371,6 +372,8 @@ sharding
 skipHeaderRows
 smooshed
 splittable
+ssl
+sslmode
 stdout
 storages
 stringified
@@ -404,6 +407,7 @@ unparseable
 unparsed
 unsetting
 useFilterCNF
+useSSL
 uptime
 uris
 useFieldDiscovery

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org