You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/31 03:32:50 UTC
[incubator-doris] branch master updated: [feature-wip](multi-catalog) Add basic class and interface for multi catalog support
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2b93a4165 [feature-wip](multi-catalog) Add basic class and interface for multi catalog support
e2b93a4165 is described below
commit e2b93a41655e47670173150a9f09dac0cb09d6f8
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue May 31 11:32:44 2022 +0800
[feature-wip](multi-catalog) Add basic class and interface for multi catalog support
---
.../java/org/apache/doris/catalog/Catalog.java | 2 +-
.../java/org/apache/doris/catalog/DatabaseIf.java | 109 ++++++++++
.../java/org/apache/doris/catalog/TableIf.java | 77 +++++++
.../doris/catalog/external/ExternalDatabase.java | 221 +++++++++++++++++++++
.../doris/catalog/external/ExternalTable.java | 150 ++++++++++++++
.../doris/datasource/DataSourceException.java | 30 +++
.../org/apache/doris/datasource/DataSourceIf.java | 64 ++++++
.../org/apache/doris/datasource/DataSourceMgr.java | 70 +++++++
.../doris/datasource/EsExternalDataSource.java | 64 ++++++
.../doris/datasource/ExternalDataSource.java | 175 ++++++++++++++++
.../doris/datasource/HMSExternalDataSource.java | 74 +++++++
.../doris/datasource/InternalDataSource.java | 107 ++++++++++
.../org/apache/doris/datasource/MetaObjCache.java | 32 +++
.../apache/doris/datasource/SessionContext.java | 25 +++
.../apache/doris/external/ExternalScanRange.java | 26 +++
.../doris/planner/external/ExternalScanNode.java | 50 +++++
.../java/org/apache/doris/qe/ConnectContext.java | 7 +
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
gensrc/thrift/PlanNodes.thrift | 11 +
19 files changed, 1294 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 45dc0ad90d..94c42a4786 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1702,7 +1702,7 @@ public class Catalog {
LOG.info("finished replay masterInfo from image");
return newChecksum;
}
-
+
public long loadFrontends(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
new file mode 100644
index 0000000000..060f96bcff
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Database interface.
+ * TODO:
+ * I just copied some common interface from the origin Database class.
+ * Maybe changed later.
+ */
+public interface DatabaseIf {
+
+ void readLock();
+
+ void readUnlock();
+
+ void writeLock();
+
+ void writeUnlock();
+
+ boolean tryWriteLock(long timeout, TimeUnit unit);
+
+ boolean isWriteLockHeldByCurrentThread();
+
+ boolean writeLockIfExist();
+
+ <E extends Exception>
+ void writeLockOrException(E e) throws E;
+
+ void writeLockOrDdlException() throws DdlException;
+
+ long getId();
+
+ String getFullName();
+
+ DatabaseProperty getDbProperties();
+
+ boolean isTableExist(String tableName);
+
+ List<Table> getTables();
+
+ List<Table> getTablesOnIdOrder();
+
+ List<Table> getViews();
+
+ List<Table> getTablesOnIdOrderIfExist(List<Long> tableIdList);
+
+ List<Table> getTablesOnIdOrderOrThrowException(List<Long> tableIdList) throws MetaNotFoundException;
+
+ Set<String> getTableNamesWithLock();
+
+ Table getTableNullable(String tableName);
+
+ Optional<Table> getTable(String tableName);
+
+ Optional<Table> getTable(long tableId);
+
+ <E extends Exception>
+ Table getTableOrException(String tableName, java.util.function.Function<String, E> e) throws E;
+
+ <E extends Exception>
+ Table getTableOrException(long tableId, java.util.function.Function<Long, E> e) throws E;
+
+ Table getTableOrMetaException(String tableName) throws MetaNotFoundException;
+
+ Table getTableOrMetaException(long tableId) throws MetaNotFoundException;
+
+ @SuppressWarnings("unchecked")
+ <T extends Table>
+ T getTableOrMetaException(String tableName, Table.TableType tableType) throws MetaNotFoundException;
+
+ @SuppressWarnings("unchecked")
+ <T extends Table>
+ T getTableOrMetaException(long tableId, Table.TableType tableType) throws MetaNotFoundException;
+
+ Table getTableOrDdlException(String tableName) throws DdlException;
+
+ Table getTableOrDdlException(long tableId) throws DdlException;
+
+ Table getTableOrAnalysisException(String tableName) throws AnalysisException;
+
+ OlapTable getOlapTableOrAnalysisException(String tableName) throws AnalysisException;
+
+ Table getTableOrAnalysisException(long tableId) throws AnalysisException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
new file mode 100644
index 0000000000..109fb3f59c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.alter.AlterCancelException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public interface TableIf {
+
+ void readLock();
+
+ boolean tryReadLock(long timeout, TimeUnit unit);
+
+ void readUnlock();
+
+ void writeLock();
+
+ boolean writeLockIfExist();
+
+ boolean tryWriteLock(long timeout, TimeUnit unit);
+
+ void writeUnlock();
+
+ boolean isWriteLockHeldByCurrentThread();
+
+ <E extends Exception>
+ void writeLockOrException(E e) throws E;
+
+ void writeLockOrDdlException() throws DdlException;
+
+ void writeLockOrMetaException() throws MetaNotFoundException;
+
+ void writeLockOrAlterCancelException() throws AlterCancelException;
+
+ boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException;
+
+ <E extends Exception>
+ boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E;
+
+ boolean tryWriteLockIfExist(long timeout, TimeUnit unit);
+
+ long getId();
+
+ String getName();
+
+ Table.TableType getType();
+
+ List<Column> getFullSchema();
+
+ List<Column> getBaseSchema();
+
+ List<Column> getBaseSchema(boolean full);
+
+ void setNewFullSchema(List<Column> newSchema);
+
+ Column getColumn(String name);
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
new file mode 100644
index 0000000000..81f51de688
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.DatabaseProperty;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.datasource.ExternalDataSource;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
+public class ExternalDatabase implements DatabaseIf {
+
+ private static final Logger LOG = LogManager.getLogger(Database.class);
+
+ private long id;
+ private String name;
+ private ReentrantReadWriteLock rwLock;
+
+ private ExternalDataSource extDataSource;
+
+ public ExternalDatabase() {
+
+ }
+
+ @Override
+ public void readLock() {
+
+ }
+
+ @Override
+ public void readUnlock() {
+
+ }
+
+ @Override
+ public void writeLock() {
+
+ }
+
+ @Override
+ public void writeUnlock() {
+
+ }
+
+ @Override
+ public boolean tryWriteLock(long timeout, TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ public boolean isWriteLockHeldByCurrentThread() {
+ return false;
+ }
+
+ @Override
+ public boolean writeLockIfExist() {
+ return true;
+ }
+
+ @Override
+ public <E extends Exception> void writeLockOrException(E e) throws E {
+
+ }
+
+ @Override
+ public void writeLockOrDdlException() throws DdlException {
+
+ }
+
+ @Override
+ public long getId() {
+ return 0;
+ }
+
+ @Override
+ public String getFullName() {
+ return null;
+ }
+
+ @Override
+ public DatabaseProperty getDbProperties() {
+ return null;
+ }
+
+ @Override
+ public boolean isTableExist(String tableName) {
+ return extDataSource.tableExist(ConnectContext.get().getSessionContext(), name, tableName);
+ }
+
+ @Override
+ public List<Table> getTables() {
+ return null;
+ }
+
+ @Override
+ public List<Table> getTablesOnIdOrder() {
+ return null;
+ }
+
+ @Override
+ public List<Table> getViews() {
+ return null;
+ }
+
+ @Override
+ public List<Table> getTablesOnIdOrderIfExist(List<Long> tableIdList) {
+ return null;
+ }
+
+ @Override
+ public List<Table> getTablesOnIdOrderOrThrowException(List<Long> tableIdList) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Set<String> getTableNamesWithLock() {
+ return null;
+ }
+
+ @Override
+ public Table getTableNullable(String tableName) {
+ return null;
+ }
+
+ @Override
+ public Optional<Table> getTable(String tableName) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<Table> getTable(long tableId) {
+ return Optional.empty();
+ }
+
+ @Override
+ public <E extends Exception> Table getTableOrException(String tableName, Function<String, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public <E extends Exception> Table getTableOrException(long tableId, Function<Long, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrMetaException(String tableName) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrMetaException(long tableId) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public <T extends Table> T getTableOrMetaException(String tableName, Table.TableType tableType)
+ throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public <T extends Table> T getTableOrMetaException(long tableId, Table.TableType tableType)
+ throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrDdlException(String tableName) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrDdlException(long tableId) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrAnalysisException(String tableName) throws AnalysisException {
+ return null;
+ }
+
+ @Override
+ public OlapTable getOlapTableOrAnalysisException(String tableName) throws AnalysisException {
+ return null;
+ }
+
+ @Override
+ public Table getTableOrAnalysisException(long tableId) throws AnalysisException {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
new file mode 100644
index 0000000000..f87d955220
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.alter.AlterCancelException;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * External table represent tables that are not self-managed by Doris.
+ * Such as tables from hive, iceberg, es, etc.
+ */
+public class ExternalTable implements TableIf {
+
+ @Override
+ public void readLock() {
+
+ }
+
+ @Override
+ public boolean tryReadLock(long timeout, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public void readUnlock() {
+
+ }
+
+ @Override
+ public void writeLock() {
+
+ }
+
+ @Override
+ public boolean writeLockIfExist() {
+ return false;
+ }
+
+ @Override
+ public boolean tryWriteLock(long timeout, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public void writeUnlock() {
+
+ }
+
+ @Override
+ public boolean isWriteLockHeldByCurrentThread() {
+ return false;
+ }
+
+ @Override
+ public <E extends Exception> void writeLockOrException(E e) throws E {
+
+ }
+
+ @Override
+ public void writeLockOrDdlException() throws DdlException {
+
+ }
+
+ @Override
+ public void writeLockOrMetaException() throws MetaNotFoundException {
+
+ }
+
+ @Override
+ public void writeLockOrAlterCancelException() throws AlterCancelException {
+
+ }
+
+ @Override
+ public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException {
+ return false;
+ }
+
+ @Override
+ public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E {
+ return false;
+ }
+
+ @Override
+ public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public long getId() {
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public Table.TableType getType() {
+ return null;
+ }
+
+ @Override
+ public List<Column> getFullSchema() {
+ return null;
+ }
+
+ @Override
+ public List<Column> getBaseSchema() {
+ return null;
+ }
+
+ @Override
+ public List<Column> getBaseSchema(boolean full) {
+ return null;
+ }
+
+ @Override
+ public void setNewFullSchema(List<Column> newSchema) {
+
+ }
+
+ @Override
+ public Column getColumn(String name) {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceException.java
new file mode 100644
index 0000000000..86e3824f19
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceException.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.UserException;
+
+public class DataSourceException extends UserException {
+ public DataSourceException(String msg) {
+ super(msg);
+ }
+
+ public DataSourceException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
new file mode 100644
index 0000000000..384c6e3099
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceIf.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+/**
+ *
+ */
+public interface DataSourceIf {
+
+ // Type of this data source
+ String getType();
+
+ // Name of this data source
+ String getName();
+
+ @Nullable
+ Database getDbNullable(String dbName);
+
+ @Nullable
+ Database getDbNullable(long dbId);
+
+ Optional<Database> getDb(String dbName);
+
+ Optional<Database> getDb(long dbId);
+
+ <E extends Exception> Database getDbOrException(String dbName, java.util.function.Function<String, E> e) throws E;
+
+ <E extends Exception> Database getDbOrException(long dbId, java.util.function.Function<Long, E> e) throws E;
+
+ Database getDbOrMetaException(String dbName) throws MetaNotFoundException;
+
+ Database getDbOrMetaException(long dbId) throws MetaNotFoundException;
+
+ Database getDbOrDdlException(String dbName) throws DdlException;
+
+ Database getDbOrDdlException(long dbId) throws DdlException;
+
+ Database getDbOrAnalysisException(String dbName) throws AnalysisException;
+
+ Database getDbOrAnalysisException(long dbId) throws AnalysisException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
new file mode 100644
index 0000000000..b735413070
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.MetaNotFoundException;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * DataSourceMgr will loaded all data sources at FE startup,
+ * and save them in maps mapping with id and name.
+ */
+public class DataSourceMgr {
+
+ private Map<Long, ExternalDataSource> idToDataSource = Maps.newConcurrentMap();
+ private Map<String, ExternalDataSource> nameToDataSource = Maps.newConcurrentMap();
+
+ public DataSourceMgr() {
+ loadDataSources();
+ }
+
+ public void loadDataSources() {
+ // TODO: Actually, we should initialize the data source object where user executing "create catalog" cmd,
+ // not loaded from config file.
+ }
+
+ private void registerDataSource(ExternalDataSource ds) {
+ ds.setId(Catalog.getCurrentCatalog().getNextId());
+ idToDataSource.put(ds.getId(), ds);
+ nameToDataSource.put(ds.getName(), ds);
+ }
+
+ public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(long id, java.util.function.Function<Long, E> e) throws E {
+ ExternalDataSource ds = idToDataSource.get(id);
+ if (ds == null) {
+ throw e.apply(id);
+ }
+ return ds;
+ }
+
+ public <E extends MetaNotFoundException> ExternalDataSource getDataSourceOrException(String name, java.util.function.Function<String, E> e) throws E {
+ ExternalDataSource ds = nameToDataSource.get(name);
+ if (ds == null) {
+ throw e.apply(name);
+ }
+ return ds;
+ }
+
+ public boolean hasDataSource(String name) {
+ return nameToDataSource.containsKey(name);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
new file mode 100644
index 0000000000..983e4dd6fd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.external.ExternalScanRange;
+
+import java.util.List;
+
+/**
+ * External data source for elasticsearch
+ */
+public class EsExternalDataSource extends ExternalDataSource {
+
+ @Override
+ public String getType() {
+ return "es";
+ }
+
+ @Override
+ public String getName() {
+ return "es";
+ }
+
+ @Override
+ public List<String> listDatabaseNames(SessionContext ctx) {
+ return null;
+ }
+
+ @Override
+ public List<String> listTableNames(SessionContext ctx, String dbName) {
+ return null;
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+ return false;
+ }
+
+ @Override
+ public List<Column> getSchema(SessionContext ctx, String dbName, String tblName) {
+ return null;
+ }
+
+ @Override
+ public List<ExternalScanRange> getExternalScanRanges(SessionContext ctx) {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
new file mode 100644
index 0000000000..f75409f30b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.external.ExternalScanRange;
+
+import com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * The abstract class for all types of external data sources.
+ */
+public abstract class ExternalDataSource implements DataSourceIf {
+
+ // Unique id of this data source, will be assigned after data source is loaded.
+ private long id;
+
+ // save properties of this data source, such as hive meta store url.
+ private Map<String, String> properties = Maps.newHashMap();
+
+ private Map<Long, ExternalDatabase> idToDbs = Maps.newConcurrentMap();
+ private Map<String, ExternalDatabase> nameToDbs = Maps.newConcurrentMap();
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getProperty(String key) {
+ return properties.get(key);
+ }
+
+ public String getPropertyOrException(String key) throws DataSourceException {
+ if (properties.containsKey(key)) {
+ return properties.get(key);
+ }
+ throw new DataSourceException("Not found property " + key + " in data source " + getName());
+ }
+
+
+ /**
+ * @return names of database in this data source.
+ */
+ public abstract List<String> listDatabaseNames(SessionContext ctx);
+
+ /**
+ * @param dbName
+ * @return names of tables in specified database
+ */
+ public abstract List<String> listTableNames(SessionContext ctx, String dbName);
+
+ /**
+ * check if the specified table exist.
+ *
+ * @param dbName
+ * @param tblName
+ * @return true if table exists, false otherwise
+ */
+ public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName);
+
+ /**
+ * get schema of the specified table
+ *
+ * @param dbName
+ * @param tblName
+ * @return list of columns as table's schema
+ */
+ public abstract List<Column> getSchema(SessionContext ctx, String dbName, String tblName);
+
+ /**
+ * @return list of ExternalScanRange
+ */
+ public abstract List<ExternalScanRange> getExternalScanRanges(SessionContext ctx);
+
+ @Override
+ public String getType() {
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Database getDbNullable(String dbName) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Database getDbNullable(long dbId) {
+ return null;
+ }
+
+ @Override
+ public Optional<Database> getDb(String dbName) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<Database> getDb(long dbId) {
+ return Optional.empty();
+ }
+
+ @Override
+ public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrDdlException(String dbName) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrDdlException(long dbId) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
new file mode 100644
index 0000000000..50aebbacef
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.external.ExternalScanRange;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * External data source for hive metastore compatible data sources.
+ */
+public class HMSExternalDataSource extends ExternalDataSource {
+ private static final Logger LOG = LogManager.getLogger(HMSExternalDataSource.class);
+
+ public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+
+ public HMSExternalDataSource() {
+
+ }
+
+ @Override
+ public String getType() {
+ return "hms";
+ }
+
+ @Override
+ public String getName() {
+ return "hms";
+ }
+
+ @Override
+ public List<String> listDatabaseNames(SessionContext ctx) {
+ return null;
+ }
+
+ @Override
+ public List<String> listTableNames(SessionContext ctx, String dbName) {
+ return null;
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+ return false;
+ }
+
+ @Override
+ public List<Column> getSchema(SessionContext ctx, String dbName, String tblName) {
+ return null;
+ }
+
+ @Override
+ public List<ExternalScanRange> getExternalScanRanges(SessionContext ctx) {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
new file mode 100644
index 0000000000..17831cacd3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * The Internal data source will manage all self-managed meta object in a Doris cluster.
+ * Such as Database, tables, etc.
+ * There is only one internal data source in a cluster.
+ */
+public class InternalDataSource implements DataSourceIf {
+ @Override
+ public String getType() {
+ return "internal";
+ }
+
+ @Override
+ public String getName() {
+ return "_internal_";
+ }
+
+ @Nullable
+ @Override
+ public Database getDbNullable(String dbName) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Database getDbNullable(long dbId) {
+ return null;
+ }
+
+ @Override
+ public Optional<Database> getDb(String dbName) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<Database> getDb(long dbId) {
+ return Optional.empty();
+ }
+
+ @Override
+ public <E extends Exception> Database getDbOrException(String dbName, Function<String, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public <E extends Exception> Database getDbOrException(long dbId, Function<Long, E> e) throws E {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrMetaException(String dbName) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrMetaException(long dbId) throws MetaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrDdlException(String dbName) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrDdlException(long dbId) throws DdlException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrAnalysisException(String dbName) throws AnalysisException {
+ return null;
+ }
+
+ @Override
+ public Database getDbOrAnalysisException(long dbId) throws AnalysisException {
+ return null;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaObjCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaObjCache.java
new file mode 100644
index 0000000000..82b844142e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaObjCache.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+/**
+ * MetaObjCache caches and manages all meta objects fetched from external datasource.
+ * So that we don't need to get meta objects every time.
+ */
+public class MetaObjCache {
+ private static class SingletonHolder {
+ private static final MetaObjCache INSTANCE = new MetaObjCache();
+ }
+
+ public static MetaObjCache get() {
+ return SingletonHolder.INSTANCE;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SessionContext.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SessionContext.java
new file mode 100644
index 0000000000..f4fc2393ac
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SessionContext.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+/**
+ * Save information that may need to pass to the external data source from Doris.
+ * Such as user info, session variable, predicates, etc.
+ */
+public class SessionContext {
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java
new file mode 100644
index 0000000000..b82742852a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external;
+
+/**
+ * Used to describe the data information that ExternalScanNode needs to read external data sources.
+ * For example, for hive, the ExternalScanRange may save the file info which need to be read,
+ * such as file path, file format, start and offset, etc.
+ */
+public class ExternalScanRange {
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
new file mode 100644
index 0000000000..1d4f4b983f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import java.util.List;
+
+/**
+ * ExternalScanNode is used to unify data reading from external data sources
+ * For this type of data source, we only access its data through the scan node,
+ * and after dividing the data of the data source, the scan task is distributed to one or more Backends for execution.
+ * For example:
+ * hive, iceberg, hudi, es, odbc
+ */
+public class ExternalScanNode extends ScanNode {
+
+ public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, NodeType nodeType) {
+ super(id, desc, planNodeName, nodeType);
+ }
+
+ @Override
+ public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+ return null;
+ }
+
+ @Override
+ protected void toThrift(TPlanNode msg) {
+
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 65642bbe4b..4f467649ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.SessionContext;
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
@@ -135,6 +136,12 @@ public class ConnectContext {
private InsertResult insertResult;
+ private SessionContext sessionContext;
+
+ public SessionContext getSessionContext() {
+ return sessionContext;
+ }
+
public void setOrUpdateInsertResult(long txnId, String label, String db, String tbl,
TransactionStatus txnStatus, long loadedRows, int filteredRows) {
if (isTxnModel() && insertResult != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5e03947a87..566d1ab28c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1385,7 +1385,7 @@ public class Coordinator {
HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
// set scan ranges/locations for scan nodes
for (ScanNode scanNode : scanNodes) {
- // the parameters of getScanRangeLocations may ignore, It dosn't take effect
+ // the parameters of getScanRangeLocations may ignore, It doesn't take effect
List<TScanRangeLocations> locations = scanNode.getScanRangeLocations(0);
if (locations == null) {
// only analysis olap scan node
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f94532bcb6..1494a6529a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -213,6 +213,16 @@ struct TEsScanRange {
4: required i32 shard_id
}
+struct TFileScanRange {
+
+}
+
+// Scan range for external datasource, such as file on hdfs, es datanode, etc.
+struct TExternalScanRange {
+ 1: optional TFileScanRange file_scan_range
+ // TODO: add more scan range type?
+}
+
// Specification of an individual data range which is held in its entirety
// by a storage server
struct TScanRange {
@@ -221,6 +231,7 @@ struct TScanRange {
5: optional binary kudu_scan_token // Decrepated
6: optional TBrokerScanRange broker_scan_range
7: optional TEsScanRange es_scan_range
+ 8: optional TExternalScanRange ext_scan_range
}
struct TMySQLScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org