You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/02 16:11:38 UTC

[GitHub] [doris] stalary opened a new pull request, #10565: [feature-wip](multi-catalog) Support es datasource

stalary opened a new pull request, #10565:
URL: https://github.com/apache/doris/pull/10565

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Support es datasource, This PR supports query index related metadata.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10565: [feature-wip](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10565:
URL: https://github.com/apache/doris/pull/10565#discussion_r923253097


##########
fe/fe-core/pom.xml:
##########
@@ -629,6 +629,7 @@ under the License.
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
+            <scope>provided</scope>

Review Comment:
   Are you sure that this dependency is provided?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java:
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.datasource.EsExternalDataSource;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.thrift.TTableDescriptor;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Elasticsearch external table.
+ */
+public class EsExternalTable extends ExternalTable {
+
+    private static final Logger LOG = LogManager.getLogger(EsExternalTable.class);
+
+    private final EsExternalDataSource ds;
+    private final String dbName;
+    private boolean initialized = false;
+
+    /**
+     * Create elasticsearch external table.
+     *
+     * @param id Table id.
+     * @param name Table name.
+     * @param dbName Database name.
+     * @param ds HMSExternalDataSource.
+     */
+    public EsExternalTable(long id, String name, String dbName, EsExternalDataSource ds) {
+        super(id, name);
+        this.dbName = dbName;
+        this.ds = ds;
+        this.type = TableType.ES_EXTERNAL_TABLE;
+    }
+
+
+    private synchronized void makeSureInitialized() {
+        if (!initialized) {
+            init();
+            initialized = true;
+        }
+    }
+
+    private void init() {
+        fullSchema = EsUtil.genColumnsFromEs(ds.getEsRestClient(), name, null);
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        makeSureInitialized();
+        return fullSchema;
+    }
+
+    @Override
+    public List<Column> getBaseSchema() {
+        return getFullSchema();
+    }
+
+    @Override
+    public List<Column> getBaseSchema(boolean full) {
+        return getFullSchema();
+    }
+
+    @Override
+    public Column getColumn(String name) {
+        makeSureInitialized();
+        for (Column column : fullSchema) {
+            if (name.equals(column.getName())) {
+                return column;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String getMysqlType() {
+        return type.name();
+    }
+
+    /**
+     * get database name of hms table.
+     */
+    public String getDbName() {
+        return dbName;
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        return super.toThrift();
+    }
+
+    public EsTable toEsTable() {
+        EsTable esTable = new EsTable(this.id, this.name, this.getFullSchema(), TableType.ES_EXTERNAL_TABLE, name, null,

Review Comment:
   I think we don't need to new a `EsTable` everytime we call `toEsTable()`.
   You can create it in `init`.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java:
##########
@@ -18,43 +18,175 @@
 package org.apache.doris.datasource;
 
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.EsRestClient;
+import org.apache.doris.external.elasticsearch.EsUtil;
+
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
  * External data source for elasticsearch
  */
+@Getter
 public class EsExternalDataSource extends ExternalDataSource {
+
+    private static final Logger LOG = LogManager.getLogger(EsExternalDataSource.class);
+
+    private static final String PROP_HOSTS = "elasticsearch.hosts";
+    private static final String PROP_USERNAME = "elasticsearch.username";
+    private static final String PROP_PASSWORD = "elasticsearch.password";
+    private static final String PROP_DOC_VALUE_SCAN = "elasticsearch.doc_value_scan";
+    private static final String PROP_KEYWORD_SNIFF = "elasticsearch.keyword_sniff";
+    private static final String PROP_NODES_DISCOVERY = "elasticsearch.nodes_discovery";
+    private static final String PROP_SSL = "elasticsearch.ssl";
+
+    // Cache of db name to db id.
+    private Map<String, Long> dbNameToId;
+    private Map<Long, EsExternalDatabase> idToDb;
+
+    private EsRestClient esRestClient;
+
+    private boolean initialized = false;
+
+    private String[] nodes;
+
+    private String username;
+
+    private String password;
+
+    private boolean enableDocValueScan;
+
+    private boolean enableKeywordSniff;
+
+    private boolean enableSsl;
+
+    private boolean enableNodesDiscovery;
+
     /**
      * Default constructor for EsExternalDataSource.
      */
-    public EsExternalDataSource(long catalogId, String name, Map<String, String> props) {
+    public EsExternalDataSource(long catalogId, String name, Map<String, String> props) throws DdlException {
         this.id = catalogId;
-        setName(name);
-        getDsProperty().setProperties(props);
-        setType("es");
+        this.name = name;
+        this.type = "es";
+        validate(props);
+        this.dsProperty = new DataSourceProperty();
+        this.dsProperty.setProperties(props);
+    }
+
+    private void validate(Map<String, String> properties) throws DdlException {
+        if (properties == null) {
+            throw new DdlException(
+                    "Please set properties of elasticsearch table, " + "they are: hosts, user, password, index");
+        }
+
+        if (StringUtils.isBlank(properties.get(PROP_HOSTS))) {
+            throw new DdlException("Hosts of ES table is null.");
+        }
+        nodes = properties.get(PROP_HOSTS).trim().split(",");
+
+        if (StringUtils.isNotBlank(properties.get(PROP_USERNAME))) {
+            username = properties.get(PROP_USERNAME).trim();
+        }
+
+        if (StringUtils.isNotBlank(properties.get(PROP_PASSWORD))) {
+            password = properties.get(PROP_PASSWORD).trim();
+        }
+
+        if (properties.containsKey(PROP_DOC_VALUE_SCAN)) {
+            enableDocValueScan = EsUtil.getBoolean(properties, PROP_DOC_VALUE_SCAN);
+        }
+
+        if (properties.containsKey(PROP_KEYWORD_SNIFF)) {
+            enableKeywordSniff = EsUtil.getBoolean(properties, PROP_KEYWORD_SNIFF);
+        }
+
+        if (properties.containsKey(PROP_NODES_DISCOVERY)) {
+            enableNodesDiscovery = EsUtil.getBoolean(properties, PROP_NODES_DISCOVERY);
+        }
+
+        if (properties.containsKey(PROP_SSL)) {
+            enableSsl = EsUtil.getBoolean(properties, PROP_SSL);
+            // check protocol
+            for (String seed : nodes) {
+                if (enableSsl && seed.startsWith("http://")) {
+                    throw new DdlException("if ssl_enabled is true, the https protocol must be used");
+                }
+                if (!enableSsl && seed.startsWith("https://")) {
+                    throw new DdlException("if ssl_enabled is false, the http protocol must be used");
+                }
+            }
+        }
+    }
+
+    /**
+     * Datasource can't be init when creating because the external datasource may depend on third system.
+     * So you have to make sure the client of third system is initialized before any method was called.
+     */
+    private synchronized void makeSureInitialized() {
+        if (!initialized) {
+            init();
+            initialized = true;
+        }
+    }
+
+    private void init() {
+        try {
+            validate(this.dsProperty.getProperties());
+        } catch (DdlException e) {
+            LOG.warn("validate error", e);
+        }
+        dbNameToId = Maps.newConcurrentMap();
+        idToDb = Maps.newConcurrentMap();
+        this.esRestClient = new EsRestClient(this.nodes, this.username, this.password, this.enableSsl);
+        long defaultDbId = Catalog.getCurrentCatalog().getNextId();
+        dbNameToId.put("default", defaultDbId);

Review Comment:
   define a `static final` field for this "default" string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] stalary commented on pull request #10565: [feature-wip](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
stalary commented on PR #10565:
URL: https://github.com/apache/doris/pull/10565#issuecomment-1186707195

   Currently, it only runs es7/8 simple types, es6 and array types need to be developed later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #10565: [feature](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10565:
URL: https://github.com/apache/doris/pull/10565#issuecomment-1196570551

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman merged pull request #10565: [feature](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
morningman merged PR #10565:
URL: https://github.com/apache/doris/pull/10565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10565: [feature-wip](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10565:
URL: https://github.com/apache/doris/pull/10565#discussion_r912821243


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java:
##########
@@ -18,42 +18,131 @@
 package org.apache.doris.datasource;
 
 
+import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.EsRestClient;
+import org.apache.doris.external.elasticsearch.EsUtil;
+
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * External data source for elasticsearch
  */
 public class EsExternalDataSource extends ExternalDataSource {
+
+    private static final Logger LOG = LogManager.getLogger(HMSExternalDataSource.class);
+
+    private static final AtomicLong nextId = new AtomicLong(0);
+
+    private static final String PROP_HOSTS = "elasticsearch.hosts";
+    private static final String PROP_USERNAME = "elasticsearch.username";
+    private static final String PROP_PASSWORD = "elasticsearch.password";
+    private static final String PROP_DOC_VALUE_SCAN = "elasticsearch.doc_value_scan";
+    private static final String PROP_KEYWORD_SNIFF = "elasticsearch.keyword_sniff";
+    private static final String PROP_NODES_DISCOVERY = "elasticsearch.nodes_discovery";
+    private static final String PROP_SSL = "elasticsearch.ssl";
+
+    //Cache of db name to db id.
+    private ConcurrentHashMap<String, Long> dbNameToId = new ConcurrentHashMap();
+
+    private EsRestClient esRestClient;
+
+    private String[] nodes;
+
+    private String username;
+
+    private String password;
+
+    private boolean enableDocValueScan;
+
+    private boolean enableKeywordSniff;
+
+    private boolean enableSsl;
+
+    private boolean enableNodesDiscovery;
+
     /**
      * Default constructor for EsExternalDataSource.
      */
-    public EsExternalDataSource(String name, Map<String, String> props) {
+    public EsExternalDataSource(String name, Map<String, String> props) throws DdlException {
         setName(name);
         getDsProperty().setProperties(props);
         setType("es");
+        validate(props);
+        dbNameToId.put("user", nextId.incrementAndGet());
+        dbNameToId.put("system", nextId.incrementAndGet());

Review Comment:
   the dbId should be global unique id. So we need to use Catalog.getNextId();
   And add comment to explain this 2 databases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10565: [feature-wip](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10565:
URL: https://github.com/apache/doris/pull/10565#discussion_r930768575


##########
fe/fe-core/pom.xml:
##########
@@ -624,6 +624,7 @@ under the License.
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
+            <scope>provided</scope>

Review Comment:
   remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #10565: [feature](multi-catalog) Support es datasource

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10565:
URL: https://github.com/apache/doris/pull/10565#issuecomment-1196570590

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org