You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morningman (via GitHub)" <gi...@apache.org> on 2023/01/30 09:19:22 UTC

[GitHub] [doris] morningman commented on a diff in pull request #16082: [feature-wip](multi-catalog) support iceberg union catalog, and add h…

morningman commented on code in PR #16082:
URL: https://github.com/apache/doris/pull/16082#discussion_r1090241073


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergExternalCatalogFactory.java:
##########
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.DdlException;
+
+import java.util.Map;
+
+public class IcebergExternalCatalogFactory {
+
+    public static CatalogIf createCatalog(long catalogId, String name, String resource, Map<String, String> props)
+            throws DdlException {
+        String catalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
+        if (catalogType == null) {
+            throw new DdlException("Missing iceberg.catalog.type property");

Review Comment:
   ```suggestion
               throw new DdlException("Missing " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " property");
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergExternalCatalogFactory.java:
##########
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.DdlException;
+
+import java.util.Map;
+
+public class IcebergExternalCatalogFactory {

Review Comment:
   you can create a new package `iceberg` under `datasource` to store all Iceberg related classes



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergHMSExternalCatalog.java:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.HMSResource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
+
+    private static final String HMS_TAG = "hms";
+
+    public IcebergHMSExternalCatalog(long catalogId, String name, String resource, String catalogType,
+                                     Map<String, String> props) {
+        super(catalogId, name, catalogType);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+        Configuration conf = getConfiguration();
+        hiveCatalog.setConf(conf);
+        // initialize hive catalog
+        Map<String, String> catalogProperties = new HashMap<>();
+        String metastoreUris = catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, "");
+
+        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUris);
+        catalogProperties.put(CatalogProperties.URI, metastoreUris);
+        hiveCatalog.initialize(HMS_TAG, catalogProperties);
+        catalog = hiveCatalog;
+    }
+
+    private Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    @Override
+    public List<String> listDatabaseNames() {
+        return ((HiveCatalog) catalog).listNamespaces().stream()
+            .map(Namespace::toString).collect(Collectors.toList());

Review Comment:
   The namespace from Iceberg maybe like `a.b.c`?
   Do we need to handle this, eg, change to `a_b_c`?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergRestExternalCatalog.java:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.RESTCatalog;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
+
+    public  IcebergRestExternalCatalog(long catalogId, String name, String resource, String catalogType,
+                                       Map<String, String> props) {
+        super(catalogId, name, catalogType);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        Map<String, String> restProperties = new HashMap<>();
+        String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
+        restProperties.put(CatalogProperties.URI, restUri);
+        RESTCatalog restCatalog = new RESTCatalog();
+        restCatalog.initialize(icebergCatalogType, restProperties);
+        catalog = restCatalog;
+    }
+
+    @Override
+    public List<String> listDatabaseNames() {
+        return ((RESTCatalog) catalog).listNamespaces().stream()

Review Comment:
   Is `listNamespaces` an interface of `Catalog`, or a special method only for `RESTCatalog`?
   If it is an interface of `Catalog`, I think the `listDatabaseNames()` can be implemented in `IcebergExternalCatalog` only.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergExternalCatalog.java:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.IcebergExternalDatabase;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class IcebergExternalCatalog extends ExternalCatalog {
+
+    public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
+    protected final String icebergCatalogType;
+
+    protected Catalog catalog;
+
+    public IcebergExternalCatalog(long catalogId, String name, String type) {
+        super(catalogId, name);
+        this.icebergCatalogType = type;
+    }
+
+    @Override
+    protected void init() {
+        Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
+        Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+        InitCatalogLog initCatalogLog = new InitCatalogLog();
+        initCatalogLog.setCatalogId(id);
+        initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
+        List<String> allDatabaseNames = listDatabaseNames();
+        for (String dbName : allDatabaseNames) {
+            long dbId;
+            if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
+                dbId = dbNameToId.get(dbName);
+                tmpDbNameToId.put(dbName, dbId);
+                ExternalDatabase db = idToDb.get(dbId);
+                db.setUnInitialized(invalidCacheInInit);
+                tmpIdToDb.put(dbId, db);
+                initCatalogLog.addRefreshDb(dbId);
+            } else {
+                dbId = Env.getCurrentEnv().getNextId();
+                tmpDbNameToId.put(dbName, dbId);
+                IcebergExternalDatabase db = new IcebergExternalDatabase(this, dbId, dbName);
+                tmpIdToDb.put(dbId, db);
+                initCatalogLog.addCreateDb(dbId, dbName);
+            }
+        }
+        dbNameToId = tmpDbNameToId;
+        idToDb = tmpIdToDb;
+        Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+    }
+
+    protected Type icebergTypeToDorisTypeBeta(org.apache.iceberg.types.Type type) {
+        switch (type.typeId()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case LONG:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case STRING:
+            case BINARY:
+            case FIXED:
+            case UUID:
+                return Type.STRING;
+            case STRUCT:
+                return Type.STRUCT;
+            case LIST:
+                return Type.ARRAY;
+            case MAP:
+                return Type.MAP;
+            case DATE:
+                return Type.DATEV2;
+            case TIME:
+                return Type.TIMEV2;
+            case TIMESTAMP:
+                return Type.DATETIMEV2;
+            case DECIMAL:
+                return Type.DECIMALV2;

Review Comment:
   Use `ScalarType.createDecimalType(precision, scale);`



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/IcebergExternalCatalogFactory.java:
##########
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.DdlException;
+
+import java.util.Map;
+
+public class IcebergExternalCatalogFactory {
+
+    public static CatalogIf createCatalog(long catalogId, String name, String resource, Map<String, String> props)
+            throws DdlException {
+        String catalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
+        if (catalogType == null) {
+            throw new DdlException("Missing iceberg.catalog.type property");
+        }
+        switch (catalogType) {
+            case "rest":
+                return new IcebergRestExternalCatalog(catalogId, name, resource, catalogType, props);
+            case "hms":
+                return new IcebergHMSExternalCatalog(catalogId, name, resource, catalogType, props);
+            default:
+                throw new DdlException("Unknown iceberg.catalog.type value: " + catalogType);

Review Comment:
   ```suggestion
                   throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " value: " + catalogType);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java:
##########
@@ -52,23 +53,23 @@
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.stream.Collectors;
 
 /**
  * A file scan provider for iceberg.
  */
-public class IcebergScanProvider extends HiveScanProvider {
+public class IcebergScanProvider extends QueryScanProvider {
 
     private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
     private final Analyzer analyzer;
+    private final IcebergSourceProvider delegate;

Review Comment:
   ```suggestion
       private final IcebergSourceProvider delegateProvider;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org