You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/20 03:46:51 UTC
[inlong] 01/03: [INLONG-7273][Manager] Support creating table in Kudu cluster (#7274)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit eda50fe7304f0213ffe6f2adc9f8a1602c0b3b0c
Author: feat <fe...@outlook.com>
AuthorDate: Mon Mar 20 09:37:39 2023 +0800
[INLONG-7273][Manager] Support creating table in Kudu cluster (#7274)
---
.../inlong/manager/pojo/sink/kudu/KuduSink.java | 3 +
.../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java | 5 +
.../manager/pojo/sink/kudu/KuduSinkRequest.java | 3 +
.../manager/pojo/sink/kudu/KuduTableInfo.java | 1 +
inlong-manager/manager-service/pom.xml | 5 +
.../resource/sink/kudu/KuduResourceClient.java | 179 +++++++++++++++++++++
.../resource/sink/kudu/KuduResourceOperator.java | 167 +++++++++++++++++++
pom.xml | 7 +
8 files changed, 370 insertions(+)
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
index 871ce2f98..8000b2314 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
@@ -51,6 +51,9 @@ public class KuduSink extends StreamSink {
@ApiModelProperty("Partition field list")
private String partitionKey;
+ @ApiModelProperty("Buckets for the newly created table")
+ private Integer buckets;
+
public KuduSink() {
this.setSinkType(SinkType.KUDU);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
index e7351d81c..7e2bdb883 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
@@ -51,6 +51,9 @@ public class KuduSinkDTO {
@ApiModelProperty("Partition field list")
private String partitionKey;
+ @ApiModelProperty("Buckets for the newly created table")
+ private Integer buckets;
+
/**
* Get the dto instance from the request
*/
@@ -59,6 +62,7 @@ public class KuduSinkDTO {
.tableName(request.getTableName())
.masters(request.getMasters())
.properties(request.getProperties())
+ .buckets(request.getBuckets())
.build();
}
@@ -80,6 +84,7 @@ public class KuduSinkDTO {
tableInfo.setMasters(kuduInfo.getMasters());
tableInfo.setColumns(columnList);
tableInfo.setTblProperties(kuduInfo.getProperties());
+ tableInfo.setBuckets(kuduInfo.getBuckets());
return tableInfo;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
index ab6592f96..23307f669 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
@@ -42,4 +42,7 @@ public class KuduSinkRequest extends SinkRequest {
@ApiModelProperty("Target table name")
private String tableName;
+ @ApiModelProperty("Buckets for the newly created table")
+ private Integer buckets;
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
index 46372b996..6e3e1189c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
@@ -31,6 +31,7 @@ public class KuduTableInfo {
private String masters;
private String tableName;
private String tableDesc;
+ private Integer buckets;
private Map<String, Object> tblProperties;
private List<KuduColumnInfo> columns;
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 880eaf4ad..bc3d0465f 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -561,5 +561,10 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ </dependency>
+
</dependencies>
</project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java
new file mode 100644
index 000000000..d03808a0a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceClient.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.kudu;
+
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduTableInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduType;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AlterTableOptions;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The resourceClient for Kudu.
+ */
+public class KuduResourceClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduResourceClient.class);
+
+ private static final String PARTITION_STRATEGY_HASH = "HASH";
+ private static final String PARTITION_STRATEGY_PRIMARY_KEY = "PrimaryKey";
+ private static final int DEFAULT_BUCKETS = 6;
+
+ private final KuduClient client;
+
+ public KuduResourceClient(String kuduMaster) {
+ this.client = new KuduClient.KuduClientBuilder(kuduMaster).build();
+ }
+
+ public boolean tableExist(String tableName) throws KuduException {
+ return client.tableExists(tableName);
+ }
+
+ /**
+ * Create table with given tableName and tableInfo.
+ */
+ public void createTable(String tableName, KuduTableInfo tableInfo) throws KuduException {
+ // Parse column from tableInfo.
+ List<KuduColumnInfo> columns = tableInfo.getColumns();
+ List<ColumnSchema> kuduColumns = columns.stream()
+ .sorted(Comparator.comparing(KuduColumnInfo::getPartitionStrategy))
+ .map(this::buildColumnSchema)
+ .collect(Collectors.toList());
+
+ // Build schema.
+ Schema schema = new Schema(kuduColumns);
+ // Create table options: like partitions
+ CreateTableOptions options = new CreateTableOptions();
+ List<String> parCols = columns.stream()
+ .filter(column -> PARTITION_STRATEGY_HASH.equalsIgnoreCase(column.getPartitionStrategy()))
+ .map(KuduColumnInfo::getFieldName).collect(Collectors.toList());
+ if (!parCols.isEmpty()) {
+ Integer partitionNum = tableInfo.getBuckets();
+ int buckets = partitionNum == null ? DEFAULT_BUCKETS : partitionNum;
+ options.addHashPartitions(parCols, buckets);
+ }
+
+ // Create table by KuduClient.
+ client.createTable(tableName, schema, options);
+ }
+
+ private ColumnSchema buildColumnSchema(KuduColumnInfo columnInfo) {
+ String name = columnInfo.getFieldName();
+ String type = columnInfo.getFieldType();
+ String desc = columnInfo.getFieldComment();
+
+ String kuduType = KuduType.forType(type).kuduType();
+ Type typeForName = Type.getTypeForName(kuduType);
+
+ String partitionStrategy = columnInfo.getPartitionStrategy();
+
+ ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, typeForName)
+ .comment(desc);
+
+ // Build schema: the hash key and partition key must be primary key.
+ if (PARTITION_STRATEGY_HASH.equalsIgnoreCase(partitionStrategy)
+ || PARTITION_STRATEGY_PRIMARY_KEY.equalsIgnoreCase(partitionStrategy)) {
+ builder.key(true);
+ } else {
+ // The primary key must not null.
+ builder.nullable(true);
+ }
+ return builder.build();
+ }
+
+ private ColumnSchema buildColumnSchema(
+ String name,
+ String desc,
+ Type typeForName) {
+
+ ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, typeForName)
+ .comment(desc)
+ .nullable(true);
+ return builder.build();
+ }
+
+ public List<KuduColumnInfo> getColumns(String tableName) throws KuduException {
+ // Open table and get column information.
+ KuduTable kuduTable = client.openTable(tableName);
+ List<ColumnSchema> columns = kuduTable.getSchema().getColumns();
+ return columns
+ .stream()
+ .map(columnSchema -> {
+ String comment = columnSchema.getComment();
+ Type type = columnSchema.getType();
+ String name = columnSchema.getName();
+
+ String javaType = KuduType.forKuduType(type.getName()).getType();
+
+ KuduColumnInfo columnInfo = new KuduColumnInfo();
+ columnInfo.setFieldName(name);
+ columnInfo.setFieldType(javaType);
+ columnInfo.setFieldComment(comment);
+
+ return columnInfo;
+ })
+ .collect(Collectors.toList());
+ }
+
+ public void addColumns(String tableName,
+ List<KuduColumnInfo> needAddColumns)
+ throws KuduException {
+ // Create Kudu client and open table
+ KuduTable table = client.openTable(tableName);
+
+ // Add new column to table
+ AlterTableOptions alterOptions = new AlterTableOptions();
+ for (KuduColumnInfo columnInfo : needAddColumns) {
+ String name = columnInfo.getFieldName();
+ String type = columnInfo.getFieldType();
+ String desc = columnInfo.getFieldComment();
+
+ String kuduType = KuduType.forType(type).kuduType();
+ Type typeForName = Type.getTypeForName(kuduType);
+
+ ColumnSchema columnSchema = buildColumnSchema(name, desc, typeForName);
+
+ alterOptions.addColumn(columnSchema);
+ }
+ client.alterTable(table.getName(), alterOptions);
+ }
+
+ /**
+ * Close KuduClient bundled this instance.
+ */
+ public void close() {
+ try {
+ client.close();
+ } catch (KuduException e) {
+ LOG.error("Can not properly close kuduClient.", e);
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java
new file mode 100644
index 000000000..cf11d1f18
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kudu/KuduResourceOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.kudu;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkDTO;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Kudu resource operator
+ */
+@Service
+public class KuduResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduResourceOperator.class);
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.KUDU.equals(sinkType);
+ }
+
+ /**
+ * Create Kudu table according to the sink config
+ */
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ if (sinkInfo == null) {
+ LOGGER.warn("sink info was null, skip to create resource");
+ return;
+ }
+
+ if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
+ return;
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+ return;
+ }
+
+ this.createTableIfAbsent(sinkInfo);
+ }
+
+ private KuduSinkDTO getKuduInfo(SinkInfo sinkInfo) {
+ return KuduSinkDTO.getFromJson(sinkInfo.getExtParams());
+ }
+
+ private void createTableIfAbsent(SinkInfo sinkInfo) {
+ LOGGER.info("begin to create kudu table for sinkInfo={}", sinkInfo);
+
+ // Get all info from config
+ KuduSinkDTO kuduInfo = getKuduInfo(sinkInfo);
+ List<KuduColumnInfo> columnInfoList = getColumnList(sinkInfo);
+ if (CollectionUtils.isEmpty(columnInfoList)) {
+ throw new IllegalArgumentException("no kudu columns specified");
+ }
+ KuduTableInfo tableInfo = KuduSinkDTO.getKuduTableInfo(kuduInfo, columnInfoList);
+
+ String masters = kuduInfo.getMasters();
+ String tableName = kuduInfo.getTableName();
+
+ KuduResourceClient client = null;
+ try {
+ // 1. create client
+ client = new KuduResourceClient(masters);
+
+ // 2. check if the table exists
+ boolean tableExists = client.tableExist(tableName);
+
+ if (!tableExists) {
+ // 3. create table
+ client.createTable(tableName, tableInfo);
+ } else {
+ // 4. or update table columns
+ List<KuduColumnInfo> existColumns = client.getColumns(tableName);
+ Set<String> existColumnNameSet = existColumns.stream().map(SinkField::getFieldName)
+ .collect(Collectors.toSet());
+ // Get columns need added according to column name.
+ List<KuduColumnInfo> needAddColumns = tableInfo.getColumns().stream()
+ .filter(columnInfo -> !existColumnNameSet.contains(columnInfo.getFieldName()))
+ .collect(toList());
+ if (CollectionUtils.isNotEmpty(needAddColumns)) {
+ client.addColumns(tableName, needAddColumns);
+ LOGGER.info("{} columns added for kudu table {}", needAddColumns.size(), tableName);
+ }
+ }
+ String info = "success to create Kudu resource";
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOGGER.info(info + " for sinkInfo = {}", info);
+ } catch (Throwable e) {
+ String errMsg = "create Kudu table failed: " + e.getMessage();
+ LOGGER.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ private List<KuduColumnInfo> getColumnList(SinkInfo sinkInfo) {
+ List<StreamSinkFieldEntity> fieldList = sinkFieldMapper.selectBySinkId(sinkInfo.getId());
+
+ // set columns
+ List<KuduColumnInfo> columnList = new ArrayList<>();
+ for (StreamSinkFieldEntity field : fieldList) {
+ if (StringUtils.isNotBlank(field.getExtParams())) {
+ KuduColumnInfo kuduColumnInfo = KuduColumnInfo.getFromJson(field.getExtParams());
+ CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+ columnList.add(kuduColumnInfo);
+ } else {
+ KuduColumnInfo kuduColumnInfo = new KuduColumnInfo();
+ CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+ columnList.add(kuduColumnInfo);
+ }
+ }
+
+ return columnList;
+ }
+}
diff --git a/pom.xml b/pom.xml
index a5759686d..900019a50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1114,6 +1114,13 @@
<version>${hudi.version}</version>
</dependency>
+ <!-- kudu -->
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ </dependency>
+
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>