You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/27 15:16:23 UTC
[doris] branch master updated: [feature](multi-catalog) Support es datasource (#10565)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 87b1f4c071 [feature](multi-catalog) Support es datasource (#10565)
87b1f4c071 is described below
commit 87b1f4c0719468e56f8f06641cba4b1a7a43578c
Author: Stalary <st...@163.com>
AuthorDate: Wed Jul 27 23:16:17 2022 +0800
[feature](multi-catalog) Support es datasource (#10565)
---
fe/fe-core/pom.xml | 1 +
.../main/java/org/apache/doris/catalog/Env.java | 15 ++-
.../java/org/apache/doris/catalog/EsTable.java | 144 ++------------------
.../java/org/apache/doris/catalog/TableIf.java | 5 +-
.../doris/catalog/external/EsExternalDatabase.java | 91 +++++++++++++
.../doris/catalog/external/EsExternalTable.java | 141 +++++++++++++++++++
.../org/apache/doris/common/util/JsonUtil.java | 61 +++++++++
.../apache/doris/datasource/CatalogFactory.java | 6 +-
.../org/apache/doris/datasource/DataSourceMgr.java | 5 +-
.../doris/datasource/EsExternalDataSource.java | 150 +++++++++++++++++++--
.../external/elasticsearch/EsMetaStateTracker.java | 6 +-
.../doris/external/elasticsearch/EsRestClient.java | 148 +++++++++++---------
.../doris/external/elasticsearch/EsUtil.java | 48 ++++++-
.../doris/external/elasticsearch/VersionPhase.java | 55 --------
.../java/org/apache/doris/planner/EsScanNode.java | 23 +++-
.../apache/doris/planner/SingleNodePlanner.java | 3 +
.../external/elasticsearch/VersionPhaseTest.java | 55 --------
17 files changed, 623 insertions(+), 334 deletions(-)
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 44eabcce13..8e7d4552b6 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -624,6 +624,7 @@ under the License.
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d2033b70ff..835cfbce2c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -90,6 +90,7 @@ import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
@@ -118,6 +119,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
import org.apache.doris.datasource.DataSourceIf;
import org.apache.doris.datasource.DataSourceMgr;
+import org.apache.doris.datasource.EsExternalDataSource;
import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
@@ -2983,9 +2985,9 @@ public class Env {
if (esTable.getMappingType() != null) {
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
}
- sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
- sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
- sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\",\n");
+ sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isEnableDocValueScan()).append("\",\n");
+ sb.append("\"max_docvalue_fields\" = \"").append(esTable.getMaxDocValueFields()).append("\",\n");
+ sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isEnableKeywordSniff()).append("\",\n");
sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\",\n");
sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n");
sb.append(")");
@@ -4184,11 +4186,16 @@ public class Env {
// Switch catalog of this sesseion.
public void changeCatalog(ConnectContext ctx, String catalogName) throws DdlException {
- if (dataSourceMgr.getCatalogNullable(catalogName) == null) {
+ DataSourceIf dataSourceIf = dataSourceMgr.getCatalogNullable(catalogName);
+ if (dataSourceIf == null) {
throw new DdlException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalogName),
ErrorCode.ERR_UNKNOWN_CATALOG);
}
ctx.changeDefaultCatalog(catalogName);
+ if (dataSourceIf instanceof EsExternalDataSource) {
+ ctx.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER
+ + EsExternalDataSource.DEFAULT_DB);
+ }
}
// Change current database of this session.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index ba39f2d4d2..93dd2bc2bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -19,7 +19,6 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
-import org.apache.doris.external.elasticsearch.EsMajorVersion;
import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
@@ -28,16 +27,16 @@ import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.json.simple.JSONObject;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,6 +47,8 @@ import java.util.Set;
/**
* Elasticsearch table.
**/
+@Getter
+@Setter
public class EsTable extends Table {
public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS = new HashSet<>(Arrays.asList("text"));
@@ -84,10 +85,6 @@ public class EsTable extends Table {
// we also provide configurable parameters for expert-using
// @see `MAX_DOCVALUE_FIELDS`
private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
-
- // version would be used to be compatible with different ES Cluster
- public EsMajorVersion majorVersion = null;
-
private String hosts;
private String[] seeds;
private String userName = "";
@@ -143,14 +140,18 @@ public class EsTable extends Table {
/**
* Create table for test.
**/
- public EsTable(long id, String name, List<Column> schema,
- Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException {
+ public EsTable(long id, String name, List<Column> schema, Map<String, String> properties,
+ PartitionInfo partitionInfo) throws DdlException {
super(id, name, TableType.ELASTICSEARCH, schema);
this.partitionInfo = partitionInfo;
validate(properties);
this.client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
+ public EsTable(long id, String name, List<Column> schema, TableType tableType) {
+ super(id, name, tableType, schema);
+ }
+
public Map<String, String> fieldsContext() {
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
@@ -159,26 +160,6 @@ public class EsTable extends Table {
return esMetaStateTracker.searchContext().docValueFieldsContext();
}
- public int maxDocValueFields() {
- return maxDocValueFields;
- }
-
- public boolean isDocValueScanEnable() {
- return enableDocValueScan;
- }
-
- public boolean isKeywordSniffEnable() {
- return enableKeywordSniff;
- }
-
- public boolean isNodesDiscovery() {
- return nodesDiscovery;
- }
-
- public boolean isHttpSslEnabled() {
- return httpSslEnabled;
- }
-
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException(
@@ -199,24 +180,11 @@ public class EsTable extends Table {
}
if (StringUtils.isBlank(properties.get(INDEX))) {
- throw new DdlException("Index of ES table is null. "
- + "Please add properties('index'='xxxx') when create table");
+ throw new DdlException(
+ "Index of ES table is null. " + "Please add properties('index'='xxxx') when create table");
}
indexName = properties.get(INDEX).trim();
- // Explicit setting for cluster version to avoid detecting version failure
- if (properties.containsKey(VERSION)) {
- try {
- majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
- if (majorVersion.before(EsMajorVersion.V_5_X)) {
- throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] ");
- }
- } catch (Exception e) {
- throw new DdlException("fail to parse ES major version, version= " + properties.get(VERSION).trim()
- + ", should be like '6.5.3' ");
- }
- }
-
// enable doc value scan for Elasticsearch
if (properties.containsKey(DOC_VALUE_SCAN)) {
enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN);
@@ -264,9 +232,6 @@ public class EsTable extends Table {
if (mappingType != null) {
tableContext.put("mappingType", mappingType);
}
- if (majorVersion != null) {
- tableContext.put("majorVersion", majorVersion.toString());
- }
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
@@ -334,13 +299,6 @@ public class EsTable extends Table {
passwd = tableContext.get("passwd");
indexName = tableContext.get("indexName");
mappingType = tableContext.get("mappingType");
- if (tableContext.containsKey("majorVersion")) {
- try {
- majorVersion = EsMajorVersion.parse(tableContext.get("majorVersion"));
- } catch (Exception e) {
- majorVersion = EsMajorVersion.V_5_X;
- }
- }
enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan"));
if (tableContext.containsKey("enableKeywordSniff")) {
@@ -376,58 +334,6 @@ public class EsTable extends Table {
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
- public String getHosts() {
- return hosts;
- }
-
- public String[] getSeeds() {
- return seeds;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public String getPasswd() {
- return passwd;
- }
-
- public String getIndexName() {
- return indexName;
- }
-
- public String getMappingType() {
- return mappingType;
- }
-
- public PartitionInfo getPartitionInfo() {
- return partitionInfo;
- }
-
- public EsTablePartitions getEsTablePartitions() {
- return esTablePartitions;
- }
-
- public void setEsTablePartitions(EsTablePartitions esTablePartitions) {
- this.esTablePartitions = esTablePartitions;
- }
-
- public EsMajorVersion esVersion() {
- return majorVersion;
- }
-
- public Throwable getLastMetaDataSyncException() {
- return lastMetaDataSyncException;
- }
-
- public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) {
- this.lastMetaDataSyncException = lastMetaDataSyncException;
- }
-
- public void setPartitionInfo(PartitionInfo partitionInfo) {
- this.partitionInfo = partitionInfo;
- }
-
/**
* Sync es index meta from remote ES Cluster.
*/
@@ -440,35 +346,13 @@ public class EsTable extends Table {
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster."
- + "table id: {}, err: {}", this.name, this.id, e.getMessage());
+ + "table id: {}, err: ", this.name, this.id, e);
this.esTablePartitions = null;
this.lastMetaDataSyncException = e;
}
}
- /**
- * Generate columns from ES Cluster.
- **/
public List<Column> genColumnsFromEs() {
- String mapping = client.getMapping(indexName);
- JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping, mappingType);
- Set<String> keys = (Set<String>) mappingProps.keySet();
- List<Column> columns = new ArrayList<>();
- for (String key : keys) {
- JSONObject field = (JSONObject) mappingProps.get(key);
- // Complex types are not currently supported.
- if (field.containsKey("type")) {
- Type type = EsUtil.toDorisType(field.get("type").toString());
- if (!type.isInvalid()) {
- Column column = new Column();
- column.setName(key);
- column.setType(type);
- column.setIsKey(true);
- column.setIsAllowNull(true);
- columns.add(column);
- }
- }
- }
- return columns;
+ return EsUtil.genColumnsFromEs(client, indexName, mappingType);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index b0066b28aa..9b7b1bb32b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -100,7 +100,7 @@ public interface TableIf {
*/
public enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI,
- TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE;
+ TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
@@ -128,6 +128,8 @@ public interface TableIf {
return "Table_Valued_Function";
case HMS_EXTERNAL_TABLE:
return "hms";
+ case ES_EXTERNAL_TABLE:
+ return "es";
default:
return null;
}
@@ -150,6 +152,7 @@ public interface TableIf {
case HUDI:
case TABLE_VALUED_FUNCTION:
case HMS_EXTERNAL_TABLE:
+ case ES_EXTERNAL_TABLE:
return "EXTERNAL TABLE";
default:
return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
new file mode 100644
index 0000000000..dd90844b6e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.EsExternalDataSource;
+import org.apache.doris.datasource.ExternalDataSource;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Elasticsearch metastore external database.
+ */
+public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> {
+
+ private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class);
+
+ // Cache of table name to table id.
+ private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
+ private Map<Long, EsExternalTable> idToTbl = Maps.newHashMap();
+
+ /**
+ * Create Elasticsearch external database.
+ *
+ * @param extDataSource External data source this database belongs to.
+ * @param id database id.
+ * @param name database name.
+ */
+ public EsExternalDatabase(ExternalDataSource extDataSource, long id, String name) {
+ super(extDataSource, id, name);
+ init();
+ }
+
+ private void init() {
+ List<String> tableNames = extDataSource.listTableNames(null, name);
+ if (tableNames != null) {
+ for (String tableName : tableNames) {
+ long tblId = Env.getCurrentEnv().getNextId();
+ tableNameToId.put(tableName, tblId);
+ idToTbl.put(tblId, new EsExternalTable(tblId, tableName, name, (EsExternalDataSource) extDataSource));
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getTableNamesWithLock() {
+ // Doesn't need to lock because everytime we call the hive metastore api to get table names.
+ return new HashSet<>(extDataSource.listTableNames(null, name));
+ }
+
+ @Override
+ public List<EsExternalTable> getTables() {
+ return new ArrayList<>(idToTbl.values());
+ }
+
+ @Override
+ public EsExternalTable getTableNullable(String tableName) {
+ if (!tableNameToId.containsKey(tableName)) {
+ return null;
+ }
+ return idToTbl.get(tableNameToId.get(tableName));
+ }
+
+ @Override
+ public EsExternalTable getTableNullable(long tableId) {
+ return idToTbl.get(tableId);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
new file mode 100644
index 0000000000..8852dac22a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.datasource.EsExternalDataSource;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.thrift.TEsTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Elasticsearch external table.
+ */
+public class EsExternalTable extends ExternalTable {
+
+ private static final Logger LOG = LogManager.getLogger(EsExternalTable.class);
+
+ private final EsExternalDataSource ds;
+ private final String dbName;
+ private boolean initialized = false;
+ private EsTable esTable;
+
+ /**
+ * Create elasticsearch external table.
+ *
+ * @param id Table id.
+ * @param name Table name.
+ * @param dbName Database name.
+ * @param ds HMSExternalDataSource.
+ */
+ public EsExternalTable(long id, String name, String dbName, EsExternalDataSource ds) {
+ super(id, name);
+ this.dbName = dbName;
+ this.ds = ds;
+ this.type = TableType.ES_EXTERNAL_TABLE;
+ }
+
+
+ private synchronized void makeSureInitialized() {
+ if (!initialized) {
+ init();
+ initialized = true;
+ }
+ }
+
+ private void init() {
+ fullSchema = EsUtil.genColumnsFromEs(ds.getEsRestClient(), name, null);
+ esTable = toEsTable();
+ }
+
+ @Override
+ public List<Column> getFullSchema() {
+ makeSureInitialized();
+ return fullSchema;
+ }
+
+ @Override
+ public List<Column> getBaseSchema() {
+ return getFullSchema();
+ }
+
+ @Override
+ public List<Column> getBaseSchema(boolean full) {
+ return getFullSchema();
+ }
+
+ @Override
+ public Column getColumn(String name) {
+ makeSureInitialized();
+ for (Column column : fullSchema) {
+ if (name.equals(column.getName())) {
+ return column;
+ }
+ }
+ return null;
+ }
+
+ public EsTable getEsTable() {
+ makeSureInitialized();
+ return esTable;
+ }
+
+ @Override
+ public String getMysqlType() {
+ return type.name();
+ }
+
+ /**
+ * get database name of hms table.
+ */
+ public String getDbName() {
+ return dbName;
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ TEsTable tEsTable = new TEsTable();
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0,
+ getName(), "");
+ tTableDescriptor.setEsTable(tEsTable);
+ return tTableDescriptor;
+ }
+
+ private EsTable toEsTable() {
+ EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, TableType.ES_EXTERNAL_TABLE);
+ esTable.setIndexName(name);
+ esTable.setClient(ds.getEsRestClient());
+ esTable.setUserName(ds.getUsername());
+ esTable.setPasswd(ds.getPassword());
+ esTable.setEnableDocValueScan(ds.isEnableDocValueScan());
+ esTable.setEnableKeywordSniff(ds.isEnableKeywordSniff());
+ esTable.setNodesDiscovery(ds.isEnableNodesDiscovery());
+ esTable.setHttpSslEnabled(ds.isEnableSsl());
+ esTable.setSeeds(ds.getNodes());
+ esTable.setHosts(String.join(",", ds.getNodes()));
+ esTable.syncTableMetaData();
+ return esTable;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java
new file mode 100644
index 0000000000..76d913260f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+/**
+ * Util for json use jackson.
+ **/
+public class JsonUtil {
+
+ private static final Logger LOG = LogManager.getLogger(JsonUtil.class);
+
+ private static final ObjectMapper objectMapper = new ObjectMapper().configure(
+ DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(MapperFeature.REQUIRE_SETTERS_FOR_GETTERS, true).setTimeZone(TimeZone.getDefault())
+ .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
+ public static JsonNode toJsonNode(Object obj) {
+ return objectMapper.valueToTree(obj);
+ }
+
+ public static ArrayNode parseArray(String text) {
+ try {
+ return (ArrayNode) objectMapper.readTree(text);
+ } catch (Exception e) {
+ throw new RuntimeException("Json deserialization exception.", e);
+ }
+ }
+
+ public static <T> T readValue(String text, Class<T> clazz) throws JsonProcessingException {
+ return objectMapper.readValue(text, clazz);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 5cae998bb3..f3669d325b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.AlterCatalogPropertyStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.common.DdlException;
import java.util.Map;
@@ -56,11 +57,12 @@ public class CatalogFactory {
/**
* create the datasource instance from data source log.
*/
- public static DataSourceIf constructorFromLog(CatalogLog log) {
+ public static DataSourceIf constructorFromLog(CatalogLog log) throws DdlException {
return constructorDataSource(log.getCatalogId(), log.getCatalogName(), log.getProps());
}
- private static DataSourceIf constructorDataSource(long catalogId, String name, Map<String, String> props) {
+ private static DataSourceIf constructorDataSource(long catalogId, String name, Map<String, String> props)
+ throws DdlException {
String type = props.get("type");
DataSourceIf dataSource;
switch (type) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index 3f66358d82..c9e58be38f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -124,6 +124,9 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
return Lists.newArrayList(idToCatalog.keySet());
}
+ /**
+ * Get db allow return null
+ **/
public DatabaseIf getDbNullable(long dbId) {
DatabaseIf db = internalDataSource.getDbNullable(dbId);
if (db != null) {
@@ -308,7 +311,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
/**
* Reply for create catalog event.
*/
- public void replayCreateCatalog(CatalogLog log) {
+ public void replayCreateCatalog(CatalogLog log) throws DdlException {
writeLock();
try {
DataSourceIf ds = CatalogFactory.constructorFromLog(log);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
index a07450a7cc..d531b4f153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -18,43 +18,175 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.EsRestClient;
+import org.apache.doris.external.elasticsearch.EsUtil;
+
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* External data source for elasticsearch
*/
+@Getter
public class EsExternalDataSource extends ExternalDataSource {
+
+ public static final String DEFAULT_DB = "default";
+ private static final Logger LOG = LogManager.getLogger(EsExternalDataSource.class);
+ private static final String PROP_HOSTS = "elasticsearch.hosts";
+ private static final String PROP_USERNAME = "elasticsearch.username";
+ private static final String PROP_PASSWORD = "elasticsearch.password";
+ private static final String PROP_DOC_VALUE_SCAN = "elasticsearch.doc_value_scan";
+ private static final String PROP_KEYWORD_SNIFF = "elasticsearch.keyword_sniff";
+ private static final String PROP_NODES_DISCOVERY = "elasticsearch.nodes_discovery";
+ private static final String PROP_SSL = "elasticsearch.ssl";
+
+ // Cache of db name to db id.
+ private Map<String, Long> dbNameToId;
+ private Map<Long, EsExternalDatabase> idToDb;
+
+ private EsRestClient esRestClient;
+
+ private boolean initialized = false;
+
+ private String[] nodes;
+
+ private String username = "";
+
+ private String password = "";
+
+ private boolean enableDocValueScan = true;
+
+ private boolean enableKeywordSniff = true;
+
+ private boolean enableSsl = false;
+
+ private boolean enableNodesDiscovery = true;
+
/**
* Default constructor for EsExternalDataSource.
*/
- public EsExternalDataSource(long catalogId, String name, Map<String, String> props) {
+ public EsExternalDataSource(long catalogId, String name, Map<String, String> props) throws DdlException {
this.id = catalogId;
- setName(name);
- getDsProperty().setProperties(props);
- setType("es");
+ this.name = name;
+ this.type = "es";
+ validate(props);
+ this.dsProperty = new DataSourceProperty();
+ this.dsProperty.setProperties(props);
+ }
+
+ private void validate(Map<String, String> properties) throws DdlException {
+ if (properties == null) {
+ throw new DdlException(
+ "Please set properties of elasticsearch table, " + "they are: hosts, user, password, index");
+ }
+
+ if (StringUtils.isBlank(properties.get(PROP_HOSTS))) {
+ throw new DdlException("Hosts of ES table is null.");
+ }
+ nodes = properties.get(PROP_HOSTS).trim().split(",");
+
+ if (StringUtils.isNotBlank(properties.get(PROP_USERNAME))) {
+ username = properties.get(PROP_USERNAME).trim();
+ }
+
+ if (StringUtils.isNotBlank(properties.get(PROP_PASSWORD))) {
+ password = properties.get(PROP_PASSWORD).trim();
+ }
+
+ if (properties.containsKey(PROP_DOC_VALUE_SCAN)) {
+ enableDocValueScan = EsUtil.getBoolean(properties, PROP_DOC_VALUE_SCAN);
+ }
+
+ if (properties.containsKey(PROP_KEYWORD_SNIFF)) {
+ enableKeywordSniff = EsUtil.getBoolean(properties, PROP_KEYWORD_SNIFF);
+ }
+
+ if (properties.containsKey(PROP_NODES_DISCOVERY)) {
+ enableNodesDiscovery = EsUtil.getBoolean(properties, PROP_NODES_DISCOVERY);
+ }
+
+ if (properties.containsKey(PROP_SSL)) {
+ enableSsl = EsUtil.getBoolean(properties, PROP_SSL);
+ // check protocol
+ for (String seed : nodes) {
+ if (enableSsl && seed.startsWith("http://")) {
+ throw new DdlException("if ssl_enabled is true, the https protocol must be used");
+ }
+ if (!enableSsl && seed.startsWith("https://")) {
+ throw new DdlException("if ssl_enabled is false, the http protocol must be used");
+ }
+ }
+ }
+ }
+
+ /**
+ * Datasource can't be init when creating because the external datasource may depend on third system.
+ * So you have to make sure the client of third system is initialized before any method was called.
+ */
+ private synchronized void makeSureInitialized() {
+ if (!initialized) {
+ init();
+ initialized = true;
+ }
+ }
+
+ private void init() {
+ try {
+ validate(this.dsProperty.getProperties());
+ } catch (DdlException e) {
+ LOG.warn("validate error", e);
+ }
+ dbNameToId = Maps.newConcurrentMap();
+ idToDb = Maps.newConcurrentMap();
+ this.esRestClient = new EsRestClient(this.nodes, this.username, this.password, this.enableSsl);
+ long defaultDbId = Env.getCurrentEnv().getNextId();
+ dbNameToId.put(DEFAULT_DB, defaultDbId);
+ idToDb.put(defaultDbId, new EsExternalDatabase(this, defaultDbId, "default"));
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
- return null;
+ makeSureInitialized();
+ return new ArrayList<>(dbNameToId.keySet());
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
- return null;
+ return esRestClient.getIndexes();
+ }
+
+ @Nullable
+ @Override
+ public ExternalDatabase getDbNullable(String dbName) {
+ makeSureInitialized();
+ String realDbName = ClusterNamespace.getNameFromFullName(dbName);
+ if (!dbNameToId.containsKey(realDbName)) {
+ return null;
+ }
+ return new EsExternalDatabase(this, dbNameToId.get(realDbName), realDbName);
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
- return false;
+ return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
}
@Override
public List<Long> getDbIds() {
- // TODO: implement it
- return Lists.newArrayList();
+ return Lists.newArrayList(dbNameToId.values());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
index f41837fcd8..0b556e46c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java
@@ -25,9 +25,8 @@ import java.util.List;
/**
* It is responsible for this class to schedule all network request sent to remote ES Cluster
* Request sequence
- * 1. GET /
- * 2. GET {index}/_mapping
- * 3. GET {index}/_search_shards
+ * 1. GET {index}/_mapping
+ * 2. GET {index}/_search_shards
* <p>
* note: step 1 is not necessary
*/
@@ -37,7 +36,6 @@ public class EsMetaStateTracker {
private SearchContext searchContext;
public EsMetaStateTracker(EsRestClient client, EsTable esTable) {
- builtinSearchPhase.add(new VersionPhase(client));
builtinSearchPhase.add(new MappingPhase(client));
builtinSearchPhase.add(new PartitionPhase(client));
searchContext = new SearchContext(esTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index c156bb5bc9..77fbe3bff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -17,6 +17,9 @@
package org.apache.doris.external.elasticsearch;
+import org.apache.doris.common.util.JsonUtil;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -25,17 +28,15 @@ import org.apache.http.HttpHeaders;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
import java.io.IOException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
@@ -45,40 +46,41 @@ import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+/**
+ * For get es metadata by http/https.
+ **/
public class EsRestClient {
private static final Logger LOG = LogManager.getLogger(EsRestClient.class);
- private ObjectMapper mapper;
-
- {
- mapper = new ObjectMapper();
- mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
- mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
- }
-
- private static OkHttpClient networkClient = new OkHttpClient.Builder()
- .readTimeout(10, TimeUnit.SECONDS)
- .build();
+ private static OkHttpClient networkClient = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS).build();
private static OkHttpClient sslNetworkClient;
-
private Request.Builder builder;
private String[] nodes;
private String currentNode;
private int currentNodeIndex = 0;
private boolean httpSslEnable;
+ /**
+ * For EsTable.
+ **/
public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) {
this.nodes = nodes;
this.builder = new Request.Builder();
if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
- this.builder.addHeader(HttpHeaders.AUTHORIZATION,
- Credentials.basic(authUser, authPassword));
+ this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword));
}
this.currentNode = nodes[currentNodeIndex];
this.httpSslEnable = httpSslEnable;
}
+ public OkHttpClient getClient() {
+ if (httpSslEnable) {
+ return getOrCreateSslNetworkClient();
+ }
+ return networkClient;
+ }
+
private void selectNextNode() {
currentNodeIndex++;
// reroute, because the previously failed node may have already been restored
@@ -88,6 +90,9 @@ public class EsRestClient {
currentNode = nodes[currentNodeIndex];
}
+ /**
+ * Get http nodes.
+ **/
public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
if (nodesData == null) {
@@ -104,26 +109,7 @@ public class EsRestClient {
}
/**
- * Get remote ES Cluster version
- *
- * @return
- * @throws Exception
- */
- public EsMajorVersion version() throws DorisEsException {
- Map<String, Object> result = get("/", null);
- if (result == null) {
- throw new DorisEsException("Unable to retrieve ES main cluster info.");
- }
- Map<String, String> versionBody = (Map<String, String>) result.get("version");
- return EsMajorVersion.parse(versionBody.get("number"));
- }
-
- /**
- * Get mapping for indexName
- *
- * @param indexName
- * @return
- * @throws Exception
+ * Get mapping for indexName.
*/
public String getMapping(String indexName) throws DorisEsException {
String path = indexName + "/_mapping";
@@ -134,14 +120,48 @@ public class EsRestClient {
return indexMapping;
}
+ /**
+ * Check whether index exist.
+ **/
+ public boolean existIndex(OkHttpClient httpClient, String indexName) {
+ String path = indexName + "/_mapping";
+ Response response;
+ try {
+ response = executeResponse(httpClient, path);
+ if (response.isSuccessful()) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn("existIndex error", e);
+ return false;
+ }
+ return false;
+ }
/**
- * Get Shard location
- *
- * @param indexName
- * @return
- * @throws DorisEsException
- */
+ * Get all index.
+ **/
+ public List<String> getIndexes() {
+ String indexes = execute("_cat/indices?h=index&format=json&s=index:asc");
+ if (indexes == null) {
+ throw new DorisEsException("get es indexes error");
+ }
+ List<String> ret = new ArrayList<>();
+ ArrayNode jsonNodes = JsonUtil.parseArray(indexes);
+ jsonNodes.forEach(json -> {
+ // es 7.17 has .geoip_databases, but _mapping response 400.
+ String index = json.get("index").asText();
+ if (!index.startsWith(".")) {
+ ret.add(index);
+ }
+ });
+ return ret;
+ }
+
+
+ /**
+ * Get Shard location.
+ **/
public EsShardPartitions searchShards(String indexName) throws DorisEsException {
String path = indexName + "/_search_shards";
String searchShards = execute(path);
@@ -156,15 +176,25 @@ public class EsRestClient {
**/
private synchronized OkHttpClient getOrCreateSslNetworkClient() {
if (sslNetworkClient == null) {
- sslNetworkClient = new OkHttpClient.Builder()
- .readTimeout(10, TimeUnit.SECONDS)
+ sslNetworkClient = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS)
.sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts())
- .hostnameVerifier(new TrustAllHostnameVerifier())
- .build();
+ .hostnameVerifier(new TrustAllHostnameVerifier()).build();
}
return sslNetworkClient;
}
+ private Response executeResponse(OkHttpClient httpClient, String path) throws IOException {
+ currentNode = currentNode.trim();
+ if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) {
+ currentNode = "http://" + currentNode;
+ }
+ Request request = builder.get().url(currentNode + "/" + path).build();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("es rest client request URL: {}", currentNode + "/" + path);
+ }
+ return httpClient.newCall(request).execute();
+ }
+
/**
* execute request for specific path,it will try again nodes.length times if it fails
*
@@ -187,29 +217,16 @@ public class EsRestClient {
// User may set a config like described below:
// hosts: "http://192.168.0.1:8200, http://192.168.0.2:8200"
// then currentNode will be "http://192.168.0.1:8200", " http://192.168.0.2:8200"
- currentNode = currentNode.trim();
- if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) {
- currentNode = "http://" + currentNode;
- }
- Request request = builder.get()
- .url(currentNode + "/" + path)
- .build();
- Response response = null;
if (LOG.isTraceEnabled()) {
LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
}
- try {
- response = httpClient.newCall(request).execute();
+ try (Response response = executeResponse(httpClient, path)) {
if (response.isSuccessful()) {
return response.body().string();
}
} catch (IOException e) {
LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e);
scratchExceptionForThrow = new DorisEsException(e.getMessage());
- } finally {
- if (response != null) {
- response.close();
- }
}
selectNextNode();
}
@@ -226,10 +243,9 @@ public class EsRestClient {
@SuppressWarnings("unchecked")
private <T> T parseContent(String response, String key) {
- Map<String, Object> map = Collections.emptyMap();
+ Map<String, Object> map;
try {
- JsonParser jsonParser = mapper.getJsonFactory().createJsonParser(response);
- map = mapper.readValue(jsonParser, Map.class);
+ map = JsonUtil.readValue(response, Map.class);
} catch (IOException ex) {
LOG.error("parse es response failure: [{}]", response);
throw new DorisEsException(ex.getMessage());
@@ -262,7 +278,7 @@ public class EsRestClient {
SSLSocketFactory ssfFactory;
try {
SSLContext sc = SSLContext.getInstance("TLS");
- sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom());
+ sc.init(null, new TrustManager[] {new TrustAllCerts()}, new SecureRandom());
ssfFactory = sc.getSocketFactory();
} catch (Exception e) {
throw new DorisEsException("Errors happens when create ssl socket");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 2b098a669a..3537c1ad3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -45,12 +45,16 @@ import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
import org.apache.doris.thrift.TExprOpcode;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -58,6 +62,8 @@ import java.util.stream.Collectors;
**/
public class EsUtil {
+ private static final Logger LOG = LogManager.getLogger(EsUtil.class);
+
/**
* Analyze partition and distributionDesc.
**/
@@ -143,6 +149,18 @@ public class EsUtil {
// Elasticsearch 8.x, include_type_name parameter is removed
if (rootSchema == null) {
properties = (JSONObject) mappings.get("properties");
+ // Compatible es6 with no type passed in.
+ if (mappingType == null) {
+ String typeKey = (String) mappings.keySet().iterator().next();
+ JSONObject typeProps = (JSONObject) ((JSONObject) mappings.get(typeKey)).get("properties");
+ if (typeProps != null) {
+ properties = typeProps;
+ if (properties.containsKey("mappings")) {
+ properties.remove("mappings");
+ properties.remove("settings");
+ }
+ }
+ }
} else {
properties = (JSONObject) rootSchema.get("properties");
}
@@ -349,6 +367,33 @@ public class EsUtil {
return null;
}
+
+ /**
+ * Generate columns from ES Cluster.
+ **/
+ public static List<Column> genColumnsFromEs(EsRestClient client, String indexName, String mappingType) {
+ String mapping = client.getMapping(indexName);
+ JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping, mappingType);
+ Set<String> keys = (Set<String>) mappingProps.keySet();
+ List<Column> columns = new ArrayList<>();
+ for (String key : keys) {
+ JSONObject field = (JSONObject) mappingProps.get(key);
+ // Complex types are not currently supported.
+ if (field.containsKey("type")) {
+ Type type = toDorisType(field.get("type").toString());
+ if (!type.isInvalid()) {
+ Column column = new Column();
+ column.setName(key);
+ column.setType(type);
+ column.setIsKey(true);
+ column.setIsAllowNull(true);
+ columns.add(column);
+ }
+ }
+ }
+ return columns;
+ }
+
/**
* Transfer es type to doris type.
**/
@@ -429,8 +474,7 @@ public class EsUtil {
if (StringUtils.isNotBlank(type)) {
initScrollUrl.append("/").append(type);
}
- initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=")
- .append(batchSize);
+ initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=").append(batchSize);
nextScrollUrl.append("/_search/scroll?").append(filterPath);
return new EsUrls(null, initScrollUrl.toString(), nextScrollUrl.toString());
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
deleted file mode 100644
index 8d4e91186e..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-/**
- * Request version from remote ES Cluster. If request fails, set the version with `LATEST`
- */
-public class VersionPhase implements SearchPhase {
-
- private EsRestClient client;
-
- private boolean isVersionSet = false;
-
-
- public VersionPhase(EsRestClient client) {
- this.client = client;
- }
-
- @Override
- public void preProcess(SearchContext context) {
- if (context.esTable().esVersion() != null) {
- isVersionSet = true;
- context.version(context.esTable().esVersion());
- }
- }
-
- @Override
- public void execute(SearchContext context) {
- if (isVersionSet) {
- return;
- }
- EsMajorVersion version;
- try {
- version = client.version();
- } catch (Throwable e) {
- version = EsMajorVersion.LATEST;
- }
- context.version(version);
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index b6dedd2eaf..d2ff066c22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
@@ -83,8 +84,20 @@ public class EsScanNode extends ScanNode {
private boolean isFinalized = false;
public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
+ this(id, desc, planNodeName, false);
+ }
+
+ /**
+ * For multicatalog es.
+ **/
+ public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, boolean esExternalTable) {
super(id, desc, planNodeName, StatisticalType.ES_SCAN_NODE);
- table = (EsTable) (desc.getTable());
+ if (esExternalTable) {
+ EsExternalTable externalTable = (EsExternalTable) (desc.getTable());
+ table = externalTable.getEsTable();
+ } else {
+ table = (EsTable) (desc.getTable());
+ }
esTablePartitions = table.getEsTablePartitions();
}
@@ -134,7 +147,7 @@ public class EsScanNode extends ScanNode {
for (SlotDescriptor slotDescriptor : slotDescriptors) {
selectedFields.add(slotDescriptor.getColumn().getName());
}
- if (selectedFields.size() > table.maxDocValueFields()) {
+ if (selectedFields.size() > table.getMaxDocValueFields()) {
return 0;
}
Set<String> docValueFields = docValueContext.keySet();
@@ -159,14 +172,14 @@ public class EsScanNode extends ScanNode {
properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled()));
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
esScanNode.setProperties(properties);
- if (table.isDocValueScanEnable()) {
+ if (table.isEnableDocValueScan()) {
esScanNode.setDocvalueContext(table.docValueContext());
properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
}
properties.put(EsTable.ES_DSL, queryBuilder.toJson());
// Be use it add es host_port and shardId to query.
- EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), table.getMappingType(), table.isDocValueScanEnable(),
+ EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), table.getMappingType(), table.isEnableDocValueScan(),
ConnectContext.get().getSessionVariable().batchSize, msg.limit);
if (esUrls.getSearchUrl() != null) {
properties.put(EsTable.SEARCH_URL, esUrls.getSearchUrl());
@@ -174,7 +187,7 @@ public class EsScanNode extends ScanNode {
properties.put(EsTable.INIT_SCROLL_URL, esUrls.getInitScrollUrl());
properties.put(EsTable.NEXT_SCROLL_URL, esUrls.getNextScrollUrl());
}
- if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
+ if (table.isEnableKeywordSniff() && table.fieldsContext().size() > 0) {
esScanNode.setFieldsContext(table.fieldsContext());
}
msg.es_scan_node = esScanNode;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index dc9cfac50d..35b5defcec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1727,6 +1727,9 @@ public class SingleNodePlanner {
case HMS_EXTERNAL_TABLE:
scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HMS_FILE_SCAN_NODE");
break;
+ case ES_EXTERNAL_TABLE:
+ scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true);
+ break;
default:
break;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java
deleted file mode 100644
index a3228820f2..0000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/VersionPhaseTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.ExceptionChecker;
-
-import mockit.Expectations;
-import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class VersionPhaseTest extends EsTestCase {
-
- @Test
- public void testWorkFlow(@Injectable EsRestClient client) throws Exception {
- List<Column> columns = new ArrayList<>();
- Column k1 = new Column("k1", PrimitiveType.BIGINT);
- columns.add(k1);
- EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
- SearchContext context = new SearchContext(esTableBefore7X);
-
- new Expectations(client) {
- {
- client.version();
- minTimes = 0;
- result = EsMajorVersion.V_6_X;
- }
- };
- VersionPhase versionPhase = new VersionPhase(client);
- ExceptionChecker.expectThrowsNoException(() -> versionPhase.preProcess(context));
- ExceptionChecker.expectThrowsNoException(() -> versionPhase.execute(context));
- Assert.assertTrue(context.version().on(EsMajorVersion.V_6_X));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org