You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/02/07 06:12:47 UTC

[incubator-linkis] 03/08: add es connection

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.0-datasource
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit fcc54004aecaf32a7d684a554b5d85db9cff5e37
Author: xiaojie19852006 <xi...@163.com>
AuthorDate: Mon Feb 7 09:47:19 2022 +0800

    add es connection
---
 .../metadatamanager/service/ElasticConnection.java | 140 +++++++++++++++++++++
 1 file changed, 140 insertions(+)

diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/service/elasticsearch/src/main/java/org/apache/linkis/metadatamanager/service/ElasticConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/service/elasticsearch/src/main/java/org/apache/linkis/metadatamanager/service/ElasticConnection.java
new file mode 100644
index 0000000..7b6630e
--- /dev/null
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-manager/service/elasticsearch/src/main/java/org/apache/linkis/metadatamanager/service/ElasticConnection.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.linkis.metadatamanager.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.linkis.datasourcemanager.common.util.json.Json;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticConnection implements Closeable {
+
+    public static final String DEFAULT_TYPE_NAME = "type";
+
+    private static final String DEFAULT_MAPPING_NAME = "mappings";
+    private static final String DEFAULT_INDEX_NAME = "index";
+    private static final String FIELD_PROPS = "properties";
+
+    private RestClient restClient;
+
+    public ElasticConnection(String[] endPoints, String username, String password) throws IOException {
+        HttpHost[] httpHosts = new HttpHost[endPoints.length];
+        for(int i = 0; i < endPoints.length; i++){
+            httpHosts[i] = HttpHost.create(endPoints[i]);
+        }
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+        CredentialsProvider credentialsProvider = null;
+        if(StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)){
+            credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY,
+                    new UsernamePasswordCredentials(username, password));
+        }
+        //set only one thread
+        CredentialsProvider finalCredentialsProvider = credentialsProvider;
+        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+            if(null != finalCredentialsProvider){
+                httpClientBuilder
+                        .setDefaultCredentialsProvider(finalCredentialsProvider);
+            }
+            return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
+        });
+        this.restClient = restClientBuilder.build();
+        //Try to test connection
+        ping();
+    }
+
+    public List<String> getAllIndices() throws Exception {
+        List<String> indices = new ArrayList<>();
+        Request request = new Request("GET", "_cat/indices");
+        request.addParameter("format", "JSON");
+        Response response = restClient.performRequest(request);
+        List<Map<String, Object>> list = Json.fromJson(response.getEntity().getContent(), Map.class);
+        list.forEach( v ->{
+            String index = String.valueOf(v.getOrDefault(DEFAULT_INDEX_NAME, ""));
+            if(StringUtils.isNotBlank(index) && !index.startsWith(".")){
+                indices.add(index);
+            }
+        });
+        return indices;
+    }
+
+    public List<String> getTypes(String index) throws Exception{
+        List<String> types = new ArrayList<>();
+        Request request = new Request("GET", index +"/_mappings");
+        Response response = restClient.performRequest(request);
+        Map<String, Map<String, Object>> result =
+                Json.fromJson(response.getEntity().getContent(), Map.class);
+        Map<String, Object> indexMap = result.get(index);
+        Object props = indexMap.get(DEFAULT_MAPPING_NAME);
+        if(props instanceof Map){
+            Set keySet = ((Map)props).keySet();
+            for(Object v : keySet){
+                types.add(String.valueOf(v));
+            }
+        }
+        return types;
+    }
+
+    public Map<Object, Object> getProps(String index, String type) throws Exception{
+        Request request = new Request("GET", index + "/_mappings/" + type);
+        Response response = restClient.performRequest(request);
+        Map<String, Map<String, Object>> result =
+                Json.fromJson(response.getEntity().getContent(), Map.class);
+        Map mappings = (Map)result.get(index).get("mappings");
+        Map propsMap = mappings;
+        if(mappings.containsKey(type)){
+            Object typeMap = mappings.get(type);
+            if(typeMap instanceof Map){
+                propsMap = (Map)typeMap;
+            }
+        }
+        Object props = propsMap.get(FIELD_PROPS);
+        if(props instanceof Map){
+            return (Map)props;
+        }
+        return null;
+    }
+    public void ping() throws IOException{
+        Response response = restClient.performRequest(new Request("GET", "/"));
+        int code = response.getStatusLine().getStatusCode();
+        int successCode = 200;
+        if(code != successCode){
+            throw new RuntimeException("Ping to ElasticSearch ERROR, response code: " + code);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.restClient.close();
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org