You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/01/30 07:24:03 UTC
[incubator-linkis] 06/07: define service interface and implement class
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.0-datasource
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 21560839242a5200481f532de8d17ff48035a2f2
Author: xiaojie19852006 <xi...@163.com>
AuthorDate: Sun Jan 30 14:00:48 2022 +0800
define service interface and implement class
---
.../server/service/MetadataAppService.java | 79 +++++++++
.../service/impl/MetadataAppServiceImpl.java | 189 +++++++++++++++++++++
2 files changed, 268 insertions(+)
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/MetadataAppService.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/MetadataAppService.java
new file mode 100644
index 0000000..80d90b3
--- /dev/null
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/MetadataAppService.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.metadatamanager.server.service;
+
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.metadatamanager.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadatamanager.common.domain.MetaPartitionInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public interface MetadataAppService {
+
+ /**
+ * Get connection
+ * @param params connect params
+ * @return
+ */
+ void getConnection(String dataSourceType, String operator, Map<String, Object> params) throws Exception;
+
+ /**
+ * @param dataSourceId data source id
+ * @param system system
+ * @return
+ */
+ List<String> getDatabasesByDsId(String dataSourceId, String system, String userName) throws ErrorException;
+
+ /**
+ * @param dataSourceId data source id
+ * @param system system
+ * @param database database
+ * @return
+ */
+ List<String> getTablesByDsId(String dataSourceId, String database, String system, String userName) throws ErrorException;
+
+ /**
+ * @param dataSourceId data source id
+ * @param database database
+ * @param table table
+ * @param system system
+ * @return
+ */
+ Map<String, String> getTablePropsByDsId(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException;
+ /**
+ * @param dataSourceId data source i
+ * @param database database
+ * @param table table
+ * @param system system
+ * @return
+ */
+ MetaPartitionInfo getPartitionsByDsId(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException;
+
+ /**
+ * @param dataSourceId data source id
+ * @param database database
+ * @param table table
+ * @param system system
+ * @return
+ */
+ List<MetaColumnInfo> getColumns(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException;
+}
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/impl/MetadataAppServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/impl/MetadataAppServiceImpl.java
new file mode 100644
index 0000000..7e2bab1
--- /dev/null
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/server/src/main/java/org/apache/linkis/metadatamanager/server/service/impl/MetadataAppServiceImpl.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.metadatamanager.server.service.impl;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.datasourcemanager.common.auth.AuthContext;
+import org.apache.linkis.datasourcemanager.common.protocol.DsInfoQueryRequest;
+import org.apache.linkis.datasourcemanager.common.protocol.DsInfoResponse;
+import org.apache.linkis.metadatamanager.common.MdmConfiguration;
+import org.apache.linkis.metadatamanager.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadatamanager.common.domain.MetaPartitionInfo;
+import org.apache.linkis.metadatamanager.common.exception.MetaMethodInvokeException;
+import org.apache.linkis.metadatamanager.common.exception.MetaRuntimeException;
+import org.apache.linkis.metadatamanager.common.service.MetadataConnection;
+import org.apache.linkis.metadatamanager.server.loader.MetaClassLoaderManager;
+import org.apache.linkis.metadatamanager.server.service.MetadataAppService;
+import org.apache.linkis.rpc.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.function.BiFunction;
+
+@Service
+public class MetadataAppServiceImpl implements MetadataAppService {
+ private Sender dataSourceRpcSender;
+ private MetaClassLoaderManager metaClassLoaderManager;
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataAppServiceImpl.class);
+ @PostConstruct
+ public void init(){
+ dataSourceRpcSender = Sender.getSender(MdmConfiguration.DATA_SOURCE_SERVICE_APPLICATION.getValue());
+ metaClassLoaderManager = new MetaClassLoaderManager();
+ }
+
+
+ @Override
+ public void getConnection(String dataSourceType, String operator, Map<String, Object> params) throws Exception {
+ MetadataConnection<Closeable> metadataConnection = invokeMetaMethod(dataSourceType, "getConnection", new Object[]{operator, params}, Map.class);
+ if (Objects.nonNull(metadataConnection)){
+ Closeable connection = metadataConnection.getConnection();
+ try {
+ connection.close();
+ }catch(IOException e){
+ LOG.warn("Fail to close connection[关闭连接失败], [" + e.getMessage() + "]", e);
+ }
+ }
+ }
+
+ @Override
+ public List<String> getDatabasesByDsId(String dataSourceId, String system, String userName) throws ErrorException {
+ DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName);
+ if(StringUtils.isNotBlank(dsInfoResponse.dsType())){
+ return invokeMetaMethod(dsInfoResponse.dsType(), "getDatabases", new Object[]{dsInfoResponse.creator(), dsInfoResponse.params()}, List.class);
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<String> getTablesByDsId(String dataSourceId, String database, String system,
+ String userName) throws ErrorException {
+ DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName);
+ if(StringUtils.isNotBlank(dsInfoResponse.dsType())){
+ return invokeMetaMethod(dsInfoResponse.dsType(), "getTables", new Object[]{dsInfoResponse.creator(), dsInfoResponse.params(), database}, List.class);
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public Map<String, String> getTablePropsByDsId(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException {
+ DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName);
+ if(StringUtils.isNotBlank(dsInfoResponse.dsType())){
+ return invokeMetaMethod(dsInfoResponse.dsType(), "getTableProps", new Object[]{dsInfoResponse.creator(), dsInfoResponse.params(), database, table},
+ Map.class);
+ }
+ return new HashMap<>();
+ }
+
+ @Override
+ public MetaPartitionInfo getPartitionsByDsId(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException {
+ DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName);
+ if(StringUtils.isNotBlank(dsInfoResponse.dsType())){
+ return invokeMetaMethod(dsInfoResponse.dsType(), "getPartitions", new Object[]{dsInfoResponse.creator(), dsInfoResponse.params(), database, table},
+ MetaPartitionInfo.class);
+ }
+ return new MetaPartitionInfo();
+ }
+
+ @Override
+ public List<MetaColumnInfo> getColumns(String dataSourceId, String database, String table,
+ String system, String userName) throws ErrorException {
+ DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName);
+ if(StringUtils.isNotBlank(dsInfoResponse.dsType())){
+ return invokeMetaMethod(dsInfoResponse.dsType(), "getColumns", new Object[]{dsInfoResponse.creator(), dsInfoResponse.params(), database, table},
+ List.class);
+ }
+ return new ArrayList<>();
+ }
+
+ /**
+ * Request to get data source information
+ * (type and connection parameters)
+ * @param dataSourceId data source id
+ * @param system system
+ * @return
+ * @throws ErrorException
+ */
+ public DsInfoResponse reqToGetDataSourceInfo(String dataSourceId, String system,
+ String userName) throws ErrorException{
+ Object rpcResult = null;
+ try {
+ rpcResult = dataSourceRpcSender.ask(new DsInfoQueryRequest(dataSourceId, null, system));
+ }catch(Exception e){
+ throw new ErrorException(-1, "Remote Service Error[远端服务出错, 联系运维处理]");
+ }
+ if(rpcResult instanceof DsInfoResponse){
+ DsInfoResponse response = (DsInfoResponse)rpcResult;
+ if(!response.status()){
+ throw new ErrorException(-1, "Error in Data Source Manager Server[数据源服务出错]");
+ }
+ boolean hasPermission = (AuthContext.isAdministrator(userName) ||
+ (StringUtils.isNotBlank(response.creator()) && userName.equals(response.creator())));
+ if(!hasPermission){
+ throw new ErrorException(-1, "Don't have query permission for data source [没有数据源的查询权限]");
+ } else if (response.params().isEmpty()){
+ throw new ErrorException(-1, "Have you published the data source? [数据源未发布或者参数为空]");
+ }
+ return response;
+ }else{
+ throw new ErrorException(-1, "Remote Service Error[远端服务出错, 联系运维处理]");
+ }
+ }
+
+ /**
+ * Invoke method in meta service
+ * @param method method name
+ * @param methodArgs arguments
+ */
+ @SuppressWarnings("unchecked")
+ private <T>T invokeMetaMethod(String dsType,
+ String method, Object[] methodArgs,
+ Class<?> returnType) throws MetaMethodInvokeException {
+ BiFunction<String, Object[], Object> invoker;
+ try {
+ invoker = metaClassLoaderManager.getInvoker(dsType);
+ }catch (Exception e){
+ // TODO ERROR CODE
+ throw new MetaMethodInvokeException(-1, "Load meta service for " + dsType +
+ " fail 加载 [" + dsType + "] 元数据服务失败", e);
+ }
+ if (Objects.nonNull(invoker)){
+ try {
+ Object returnObj = invoker.apply(method, methodArgs);
+ if (Objects.nonNull(returnType)) {
+ return (T) returnObj;
+ }
+ }catch (Exception e){
+ if (e instanceof MetaRuntimeException){
+ throw new MetaMethodInvokeException(method, methodArgs, -1, e.getMessage(), e.getCause());
+ }
+ // TODO ERROR CODE
+ throw new MetaMethodInvokeException(method, methodArgs, -1,
+ "Invoke method [" + method +"] fail, message:[" + e.getMessage() + "]", e);
+ }
+ }
+ return null;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org