You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/20 03:46:51 UTC

[inlong] 01/03: [INLONG-7273][Manager] Support creating table in Kudu cluster (#7274)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit eda50fe7304f0213ffe6f2adc9f8a1602c0b3b0c
Author: feat <fe...@outlook.com>
AuthorDate: Mon Mar 20 09:37:39 2023 +0800

    [INLONG-7273][Manager] Support creating table in Kudu cluster (#7274)
---
 .../inlong/manager/pojo/sink/kudu/KuduSink.java    |   3 +
 .../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java |   5 +
 .../manager/pojo/sink/kudu/KuduSinkRequest.java    |   3 +
 .../manager/pojo/sink/kudu/KuduTableInfo.java      |   1 +
 inlong-manager/manager-service/pom.xml             |   5 +
 .../resource/sink/kudu/KuduResourceClient.java     | 179 +++++++++++++++++++++
 .../resource/sink/kudu/KuduResourceOperator.java   | 167 +++++++++++++++++++
 pom.xml                                            |   7 +
 8 files changed, 370 insertions(+)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
index 871ce2f98..8000b2314 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
@@ -51,6 +51,9 @@ public class KuduSink extends StreamSink {
     @ApiModelProperty("Partition field list")
     private String partitionKey;
 
+    @ApiModelProperty("Buckets for the newly created table")
+    private Integer buckets;
+
     public KuduSink() {
         this.setSinkType(SinkType.KUDU);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
index e7351d81c..7e2bdb883 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
@@ -51,6 +51,9 @@ public class KuduSinkDTO {
     @ApiModelProperty("Partition field list")
     private String partitionKey;
 
+    @ApiModelProperty("Buckets for the newly created table")
+    private Integer buckets;
+
     /**
      * Get the dto instance from the request
      */
@@ -59,6 +62,7 @@ public class KuduSinkDTO {
                 .tableName(request.getTableName())
                 .masters(request.getMasters())
                 .properties(request.getProperties())
+                .buckets(request.getBuckets())
                 .build();
     }
 
@@ -80,6 +84,7 @@ public class KuduSinkDTO {
         tableInfo.setMasters(kuduInfo.getMasters());
         tableInfo.setColumns(columnList);
         tableInfo.setTblProperties(kuduInfo.getProperties());
+        tableInfo.setBuckets(kuduInfo.getBuckets());
         return tableInfo;
     }
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
index ab6592f96..23307f669 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
@@ -42,4 +42,7 @@ public class KuduSinkRequest extends SinkRequest {
     @ApiModelProperty("Target table name")
     private String tableName;
 
+    @ApiModelProperty("Buckets for the newly created table")
+    private Integer buckets;
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
index 46372b996..6e3e1189c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
@@ -31,6 +31,7 @@ public class KuduTableInfo {
     private String masters;
     private String tableName;
     private String tableDesc;
+    private Integer buckets;
     private Map<String, Object> tblProperties;
     private List<KuduColumnInfo> columns;
 
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 880eaf4ad..bc3d0465f 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -561,5 +561,10 @@
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-flink1.13-bundle</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+        </dependency>
+
     </dependencies>
 </project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java
new file mode 100644
index 000000000..d03808a0a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.kudu;
+
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduTableInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduType;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AlterTableOptions;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The resourceClient for Kudu.
+ */
+public class KuduResourceClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduResourceClient.class);
+
+    private static final String PARTITION_STRATEGY_HASH = "HASH";
+    private static final String PARTITION_STRATEGY_PRIMARY_KEY = "PrimaryKey";
+    private static final int DEFAULT_BUCKETS = 6;
+
+    private final KuduClient client;
+
+    public KuduResourceClient(String kuduMaster) {
+        this.client = new KuduClient.KuduClientBuilder(kuduMaster).build();
+    }
+
+    public boolean tableExist(String tableName) throws KuduException {
+        return client.tableExists(tableName);
+    }
+
+    /**
+     * Create table with given tableName and tableInfo.
+     */
+    public void createTable(String tableName, KuduTableInfo tableInfo) throws KuduException {
+        // Parse column from tableInfo.
+        List<KuduColumnInfo> columns = tableInfo.getColumns();
+        List<ColumnSchema> kuduColumns = columns.stream()
+                .sorted(Comparator.comparing(KuduColumnInfo::getPartitionStrategy))
+                .map(this::buildColumnSchema)
+                .collect(Collectors.toList());
+
+        // Build schema.
+        Schema schema = new Schema(kuduColumns);
+        // Create table options: like partitions
+        CreateTableOptions options = new CreateTableOptions();
+        List<String> parCols = columns.stream()
+                .filter(column -> PARTITION_STRATEGY_HASH.equalsIgnoreCase(column.getPartitionStrategy()))
+                .map(KuduColumnInfo::getFieldName).collect(Collectors.toList());
+        if (!parCols.isEmpty()) {
+            Integer partitionNum = tableInfo.getBuckets();
+            int buckets = partitionNum == null ? DEFAULT_BUCKETS : partitionNum;
+            options.addHashPartitions(parCols, buckets);
+        }
+
+        // Create table by KuduClient.
+        client.createTable(tableName, schema, options);
+    }
+
+    private ColumnSchema buildColumnSchema(KuduColumnInfo columnInfo) {
+        String name = columnInfo.getFieldName();
+        String type = columnInfo.getFieldType();
+        String desc = columnInfo.getFieldComment();
+
+        String kuduType = KuduType.forType(type).kuduType();
+        Type typeForName = Type.getTypeForName(kuduType);
+
+        String partitionStrategy = columnInfo.getPartitionStrategy();
+
+        ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, typeForName)
+                .comment(desc);
+
+        // Build schema: the hash key and partition key must be primary key.
+        if (PARTITION_STRATEGY_HASH.equalsIgnoreCase(partitionStrategy)
+                || PARTITION_STRATEGY_PRIMARY_KEY.equalsIgnoreCase(partitionStrategy)) {
+            builder.key(true);
+        } else {
+            // The primary key must not null.
+            builder.nullable(true);
+        }
+        return builder.build();
+    }
+
+    private ColumnSchema buildColumnSchema(
+            String name,
+            String desc,
+            Type typeForName) {
+
+        ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, typeForName)
+                .comment(desc)
+                .nullable(true);
+        return builder.build();
+    }
+
+    public List<KuduColumnInfo> getColumns(String tableName) throws KuduException {
+        // Open table and get column information.
+        KuduTable kuduTable = client.openTable(tableName);
+        List<ColumnSchema> columns = kuduTable.getSchema().getColumns();
+        return columns
+                .stream()
+                .map(columnSchema -> {
+                    String comment = columnSchema.getComment();
+                    Type type = columnSchema.getType();
+                    String name = columnSchema.getName();
+
+                    String javaType = KuduType.forKuduType(type.getName()).getType();
+
+                    KuduColumnInfo columnInfo = new KuduColumnInfo();
+                    columnInfo.setFieldName(name);
+                    columnInfo.setFieldType(javaType);
+                    columnInfo.setFieldComment(comment);
+
+                    return columnInfo;
+                })
+                .collect(Collectors.toList());
+    }
+
+    public void addColumns(String tableName,
+            List<KuduColumnInfo> needAddColumns)
+            throws KuduException {
+        // Create Kudu client and open table
+        KuduTable table = client.openTable(tableName);
+
+        // Add new column to table
+        AlterTableOptions alterOptions = new AlterTableOptions();
+        for (KuduColumnInfo columnInfo : needAddColumns) {
+            String name = columnInfo.getFieldName();
+            String type = columnInfo.getFieldType();
+            String desc = columnInfo.getFieldComment();
+
+            String kuduType = KuduType.forType(type).kuduType();
+            Type typeForName = Type.getTypeForName(kuduType);
+
+            ColumnSchema columnSchema = buildColumnSchema(name, desc, typeForName);
+
+            alterOptions.addColumn(columnSchema);
+        }
+        client.alterTable(table.getName(), alterOptions);
+    }
+
+    /**
+     * Close KuduClient bundled this instance.
+     */
+    public void close() {
+        try {
+            client.close();
+        } catch (KuduException e) {
+            LOG.error("Can not properly close kuduClient.", e);
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java
new file mode 100644
index 000000000..cf11d1f18
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.kudu;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkDTO;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Kudu resource operator
+ */
+@Service
+public class KuduResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduResourceOperator.class);
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.KUDU.equals(sinkType);
+    }
+
+    /**
+     * Create Kudu table according to the sink config
+     */
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        if (sinkInfo == null) {
+            LOGGER.warn("sink info was null, skip to create resource");
+            return;
+        }
+
+        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
+            return;
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+            LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+            return;
+        }
+
+        this.createTableIfAbsent(sinkInfo);
+    }
+
+    private KuduSinkDTO getKuduInfo(SinkInfo sinkInfo) {
+        return KuduSinkDTO.getFromJson(sinkInfo.getExtParams());
+    }
+
+    private void createTableIfAbsent(SinkInfo sinkInfo) {
+        LOGGER.info("begin to create kudu table for sinkInfo={}", sinkInfo);
+
+        // Get all info from config
+        KuduSinkDTO kuduInfo = getKuduInfo(sinkInfo);
+        List<KuduColumnInfo> columnInfoList = getColumnList(sinkInfo);
+        if (CollectionUtils.isEmpty(columnInfoList)) {
+            throw new IllegalArgumentException("no kudu columns specified");
+        }
+        KuduTableInfo tableInfo = KuduSinkDTO.getKuduTableInfo(kuduInfo, columnInfoList);
+
+        String masters = kuduInfo.getMasters();
+        String tableName = kuduInfo.getTableName();
+
+        KuduResourceClient client = null;
+        try {
+            // 1. create client
+            client = new KuduResourceClient(masters);
+
+            // 2. check if the table exists
+            boolean tableExists = client.tableExist(tableName);
+
+            if (!tableExists) {
+                // 3. create table
+                client.createTable(tableName, tableInfo);
+            } else {
+                // 4. or update table columns
+                List<KuduColumnInfo> existColumns = client.getColumns(tableName);
+                Set<String> existColumnNameSet = existColumns.stream().map(SinkField::getFieldName)
+                        .collect(Collectors.toSet());
+                // Get columns need added according to column name.
+                List<KuduColumnInfo> needAddColumns = tableInfo.getColumns().stream()
+                        .filter(columnInfo -> !existColumnNameSet.contains(columnInfo.getFieldName()))
+                        .collect(toList());
+                if (CollectionUtils.isNotEmpty(needAddColumns)) {
+                    client.addColumns(tableName, needAddColumns);
+                    LOGGER.info("{} columns added for kudu table {}", needAddColumns.size(), tableName);
+                }
+            }
+            String info = "success to create Kudu resource";
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOGGER.info(info + " for sinkInfo = {}", info);
+        } catch (Throwable e) {
+            String errMsg = "create Kudu table failed: " + e.getMessage();
+            LOGGER.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+    private List<KuduColumnInfo> getColumnList(SinkInfo sinkInfo) {
+        List<StreamSinkFieldEntity> fieldList = sinkFieldMapper.selectBySinkId(sinkInfo.getId());
+
+        // set columns
+        List<KuduColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                KuduColumnInfo kuduColumnInfo = KuduColumnInfo.getFromJson(field.getExtParams());
+                CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+                columnList.add(kuduColumnInfo);
+            } else {
+                KuduColumnInfo kuduColumnInfo = new KuduColumnInfo();
+                CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+                columnList.add(kuduColumnInfo);
+            }
+        }
+
+        return columnList;
+    }
+}
diff --git a/pom.xml b/pom.xml
index a5759686d..900019a50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1114,6 +1114,13 @@
                 <version>${hudi.version}</version>
             </dependency>
 
+            <!-- kudu -->
+            <dependency>
+                <groupId>org.apache.kudu</groupId>
+                <artifactId>kudu-client</artifactId>
+                <version>${kudu.version}</version>
+            </dependency>
+
             <!-- lombok -->
             <dependency>
                 <groupId>org.projectlombok</groupId>