You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/04/19 01:30:47 UTC

(doris) branch master updated: [feature](merge-cloud) Add `BrokerLoad` and `DriverUrl` white list (#33003)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 037cf12ad5e [feature](merge-cloud) Add `BrokerLoad` and `DriverUrl` white list (#33003)
037cf12ad5e is described below

commit 037cf12ad5e060382df719858399c8d0f84cc6fc
Author: Lei Zhang <27...@users.noreply.github.com>
AuthorDate: Fri Apr 19 09:30:39 2024 +0800

    [feature](merge-cloud) Add `BrokerLoad` and `DriverUrl` white list (#33003)
---
 .../main/java/org/apache/doris/common/Config.java  | 15 +++++-
 .../java/org/apache/doris/analysis/LoadStmt.java   | 59 ++++++++++++++++++++++
 .../org/apache/doris/catalog/JdbcResource.java     | 38 ++++++++------
 3 files changed, 95 insertions(+), 17 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index efe0285f5ee..aa1b1f501cf 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -144,11 +144,11 @@ public class Config extends ConfigBase {
     public static String jdbc_drivers_dir = System.getenv("DORIS_HOME") + "/jdbc_drivers";
 
     @ConfField(description = {"JDBC 驱动的安全路径。在创建 JDBC Catalog 时,允许使用的文件或者网络路径,可配置多个,使用分号分隔"
-            + "默认为 * 全部允许,如果设置为空责全部不允许",
+            + "默认为 * 表示全部允许,如果设置为空也表示全部允许",
             "The safe path of the JDBC driver. When creating a JDBC Catalog,"
                     + "you can configure multiple files or network paths that are allowed to be used,"
                     + "separated by semicolons"
-                    + "The default is * to allow all, if set to empty, all are not allowed"})
+                    + "The default is * to allow all, if set to empty, also means to allow all"})
     public static String jdbc_driver_secure_path = "*";
 
     @ConfField(mutable = true, masterOnly = true, description = {"broker load 时,单个节点上 load 执行计划的默认并行度",
@@ -2713,6 +2713,17 @@ public class Config extends ConfigBase {
 
     @ConfField(mutable = true)
     public static int mow_insert_into_commit_retry_times = 10;
+
+    @ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: s3_load_endpoint_white_list=a,b,c",
+            "the white list for the s3 load endpoint, if it is empty, no white list will be set,"
+            + "for example: s3_load_endpoint_white_list=a,b,c"})
+    public static String[] s3_load_endpoint_white_list = {};
+
+    @ConfField(mutable = true, description = {"指定Jdbc driver url白名单, 举例: jdbc_driver_url_white_list=a,b,c",
+            "the white list for jdbc driver url, if it is empty, no white list will be set"
+            + "for example: jdbc_driver_url_white_list=a,b,c"
+    })
+    public static String[] jdbc_driver_url_white_list = {};
     //==========================================================================
     //                      end of cloud config
     //==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 19b72129516..b658dd16faf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -21,12 +21,14 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.security.SecurityChecker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -36,10 +38,16 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -73,6 +81,8 @@ import java.util.Map.Entry;
 //          WITH RESOURCE name
 //          (key3=value3, ...)
 public class LoadStmt extends DdlStmt {
+    private static final Logger LOG = LogManager.getLogger(LoadStmt.class);
+
     public static final String TIMEOUT_PROPERTY = "timeout";
     public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
@@ -92,6 +102,10 @@ public class LoadStmt extends DdlStmt {
     public static final String BOS_ACCESSKEY = "bos_accesskey";
     public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";
 
+    // for S3 load check
+    public static final List<String> PROVIDERS =
+            new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos"));
+
     // mini load params
     public static final String KEY_IN_PARAM_COLUMNS = "columns";
     public static final String KEY_IN_PARAM_SET = "set";
@@ -485,6 +499,7 @@ public class LoadStmt extends DdlStmt {
             }
         } else if (brokerDesc != null) {
             etlJobType = EtlJobType.BROKER;
+            checkWhiteList();
         } else if (isMysqlLoad) {
             etlJobType = EtlJobType.LOCAL_FILE;
         } else {
@@ -556,4 +571,48 @@ public class LoadStmt extends DdlStmt {
             return RedirectStatus.FORWARD_WITH_SYNC;
         }
     }
+
+    private void checkEndpoint(String endpoint) throws UserException {
+        HttpURLConnection connection = null;
+        try {
+            String urlStr = "http://" + endpoint;
+            SecurityChecker.getInstance().startSSRFChecking(urlStr);
+            URL url = new URL(urlStr);
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setConnectTimeout(10000);
+            connection.connect();
+        } catch (Exception e) {
+            LOG.warn("Failed to connect endpoint={}", endpoint, e);
+            throw new UserException(e.getMessage());
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.disconnect();
+                } catch (Exception e) {
+                    LOG.warn("Failed to disconnect connection, endpoint={}", endpoint, e);
+                }
+            }
+            SecurityChecker.getInstance().stopSSRFChecking();
+        }
+    }
+
+    public void checkWhiteList() throws UserException {
+        Map<String, String> brokerDescProperties = brokerDesc.getProperties();
+        if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
+                && brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
+                && brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY)
+                && brokerDescProperties.containsKey(S3Properties.Env.REGION)) {
+            String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
+            endpoint = endpoint.replaceFirst("^http://", "");
+            endpoint = endpoint.replaceFirst("^https://", "");
+            List<String> whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
+            whiteList.removeIf(String::isEmpty);
+            if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
+                throw new UserException("endpoint: " + endpoint
+                    + " is not in s3 load endpoint white list: " + String.join(",", whiteList));
+            }
+            brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
+            checkEndpoint(endpoint);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index 65bf7d308ff..60e9f0abc17 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -42,10 +42,11 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
-
 /**
  * External JDBC Catalog resource for external table query.
  * <p>
@@ -285,28 +286,35 @@ public class JdbcResource extends Resource {
         }
     }
 
+    private static void checkCloudWhiteList(String driverUrl) throws IllegalArgumentException {
+        // For compatibility with cloud mode, we use both `jdbc_driver_url_white_list`
+        // and jdbc_driver_secure_path to check whitelist
+        List<String> cloudWhiteList = new ArrayList<>(Arrays.asList(Config.jdbc_driver_url_white_list));
+        cloudWhiteList.removeIf(String::isEmpty);
+        if (!cloudWhiteList.isEmpty() && !cloudWhiteList.contains(driverUrl)) {
+            throw new IllegalArgumentException("Driver URL does not match any allowed paths" + driverUrl);
+        }
+    }
+
     public static String getFullDriverUrl(String driverUrl) throws IllegalArgumentException {
         try {
             URI uri = new URI(driverUrl);
             String schema = uri.getScheme();
+            checkCloudWhiteList(driverUrl);
             if (schema == null && !driverUrl.startsWith("/")) {
                 return "file://" + Config.jdbc_drivers_dir + "/" + driverUrl;
-            } else {
-                if ("*".equals(Config.jdbc_driver_secure_path)) {
-                    return driverUrl;
-                } else if (Config.jdbc_driver_secure_path.trim().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "jdbc_driver_secure_path is set to empty, disallowing all driver URLs.");
-                } else {
-                    boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";"))
+            }
+
+            if ("*".equals(Config.jdbc_driver_secure_path)) {
+                return driverUrl;
+            }
+
+            boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";"))
                             .anyMatch(allowedPath -> driverUrl.startsWith(allowedPath.trim()));
-                    if (!isAllowed) {
-                        throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl);
-                    } else {
-                        return driverUrl;
-                    }
-                }
+            if (!isAllowed) {
+                throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl);
             }
+            return driverUrl;
         } catch (URISyntaxException e) {
             LOG.warn("invalid jdbc driver url: " + driverUrl);
             return driverUrl;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org