You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 11:41:11 UTC

[incubator-inlong] branch master updated: [INLONG-3462][Manager] Add test module for manager client (#3469)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2680b51  [INLONG-3462][Manager] Add test module for manager client (#3469)
2680b51 is described below

commit 2680b517b6bbd5cf58659437deb2f607657a2f2c
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Mar 30 19:41:05 2022 +0800

    [INLONG-3462][Manager] Add test module for manager client (#3469)
---
 inlong-manager/manager-client-test/pom.xml         |  43 +++++
 .../inlong/manager/client/Binlog2KafkaTest.java    | 212 +++++++++++++++++++++
 .../inlong/manager/client/Kafka2HiveTest.java      | 193 +++++++++++++++++++
 .../inlong/manager/client/api/SinkField.java       |   5 +
 .../inlong/manager/client/api/StreamField.java     |   8 +
 inlong-manager/pom.xml                             |   1 +
 6 files changed, 462 insertions(+)

diff --git a/inlong-manager/manager-client-test/pom.xml b/inlong-manager/manager-client-test/pom.xml
new file mode 100644
index 0000000..316444e
--- /dev/null
+++ b/inlong-manager/manager-client-test/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://maven.apache.org/POM/4.0.0"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-manager</artifactId>
+        <version>1.1.0-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>manager-client-test</artifactId>
+    <name>Apache InLong - Manager Client Test</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>manager-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaTest.java b/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaTest.java
new file mode 100644
index 0000000..986baf8
--- /dev/null
+++ b/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupConf;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.api.InlongStreamConf;
+import org.apache.inlong.manager.client.api.PulsarBaseConf;
+import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.KafkaSink;
+import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.shiro.util.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class Binlog2KafkaTest {
+
+    // Manager web url
+    public static String SERVICE_URL = "127.0.0.1:8083";
+    // Inlong user && passwd
+    public static DefaultAuthentication INLONG_AUTH = new DefaultAuthentication("admin", "inlong");
+    // Inlong group name
+    public static String GROUP_NAME = "{group.name}";
+    // Inlong stream name
+    public static String STREAM_NAME = "{stream.name}";
+    // Flink cluster url
+    public static String FLINK_URL = "{flink.cluster.url}";
+    // Pulsar cluster admin url
+    public static String PULSAR_ADMIN_URL = "{pulsar.admin.url}";
+    // Pulsar cluster service url
+    public static String PULSAR_SERVICE_URL = "{pulsar.service.url}";
+    // Pulsar tenant
+    public static String tenant = "{pulsar.tenant}";
+    // Pulsar topic
+    public static String topic = "{pulsar.topic}";
+
+    @Test
+    public void testCreateGroupForKafka() throws Exception {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongStreamConf streamConf = createStreamConf();
+            InlongStreamBuilder streamBuilder = group.createStream(streamConf);
+            streamBuilder.source(createMysqlSource());
+            streamBuilder.sink(createKafkaSink());
+            streamBuilder.initOrUpdate();
+            // start group
+            InlongGroupContext inlongGroupContext = group.init();
+            Assert.notNull(inlongGroupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testStopGroup() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongGroupContext groupContext = group.delete(true);
+            Assert.notNull(groupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testRestartGroup() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongGroupContext groupContext = group.restart(true);
+            Assert.notNull(groupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testSuspendGroup() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongGroupContext groupContext = group.suspend(true);
+            Assert.notNull(groupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private InlongGroupConf createGroupConf() {
+        InlongGroupConf inlongGroupConf = new InlongGroupConf();
+        inlongGroupConf.setGroupName(GROUP_NAME);
+        inlongGroupConf.setDescription(GROUP_NAME);
+        inlongGroupConf.setProxyClusterId(1);
+        //pulsar conf
+        PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
+        pulsarBaseConf.setType(MQType.PULSAR);
+        inlongGroupConf.setMqBaseConf(pulsarBaseConf);
+        pulsarBaseConf.setPulsarServiceUrl(PULSAR_SERVICE_URL);
+        pulsarBaseConf.setPulsarAdminUrl(PULSAR_ADMIN_URL);
+        pulsarBaseConf.setNamespace("public");
+        pulsarBaseConf.setEnableCreateResource(false);
+        pulsarBaseConf.setTenant(tenant);
+        //flink conf
+        FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
+        inlongGroupConf.setSortBaseConf(sortBaseConf);
+        sortBaseConf.setServiceUrl(FLINK_URL);
+        Map<String, String> map = new HashMap<>(16);
+        sortBaseConf.setProperties(map);
+        //enable zk
+        inlongGroupConf.setZookeeperEnabled(false);
+        inlongGroupConf.setDailyRecords(10000000L);
+        inlongGroupConf.setPeakRecords(100000L);
+        inlongGroupConf.setMaxLength(10000);
+        return inlongGroupConf;
+    }
+
+    private InlongStreamConf createStreamConf() {
+        InlongStreamConf streamConf = new InlongStreamConf();
+        streamConf.setName(STREAM_NAME);
+        streamConf.setDescription(STREAM_NAME);
+        streamConf.setCharset(StandardCharsets.UTF_8);
+        streamConf.setDataSeparator(DataSeparator.VERTICAL_BAR);
+        // true if you need strictly order for data
+        streamConf.setStrictlyOrdered(true);
+        streamConf.setTopic(topic);
+        return streamConf;
+    }
+
+    public MySQLBinlogSource createMysqlSource() {
+        MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
+        mySQLBinlogSource.setDbNames(Arrays.asList("{db.name}"));
+        mySQLBinlogSource.setHostname("{db.url}");
+        mySQLBinlogSource.setAuthentication(new DefaultAuthentication("root", "inlong"));
+        mySQLBinlogSource.setSourceName("{mysql.source.name}");
+        mySQLBinlogSource.setAllMigration(true);
+        return mySQLBinlogSource;
+    }
+
+    private KafkaSink createKafkaSink() {
+        KafkaSink kafkaSink = new KafkaSink();
+        kafkaSink.setDataFormat(DataFormat.CANAL);
+        kafkaSink.setAddress("{kafka.bootstrap}");
+        kafkaSink.setTopicName("{kafka.topic}");
+        kafkaSink.setNeedCreated(false);
+        kafkaSink.setSinkName("{kafka.sink.name}");
+        Map<String, Object> properties = new HashMap<>();
+        //Not needed if kafka cluster is not set
+        properties.put("transaction.timeout.ms", 9000000);
+        kafkaSink.setProperties(properties);
+        return kafkaSink;
+    }
+}
diff --git a/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Kafka2HiveTest.java b/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Kafka2HiveTest.java
new file mode 100644
index 0000000..fb60c28
--- /dev/null
+++ b/inlong-manager/manager-client-test/src/test/java/org/apache/inlong/manager/client/Kafka2HiveTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupConf;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.api.InlongStreamConf;
+import org.apache.inlong.manager.client.api.PulsarBaseConf;
+import org.apache.inlong.manager.client.api.SinkField;
+import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
+import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.FileFormat;
+import org.apache.shiro.util.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class Kafka2HiveTest {
+
+    // Manager web url
+    public static String SERVICE_URL = "127.0.0.1:8083";
+    // Inlong user && passwd
+    public static DefaultAuthentication INLONG_AUTH = new DefaultAuthentication("admin", "inlong");
+    // Inlong group name
+    public static String GROUP_NAME = "{group.name}";
+    // Inlong stream name
+    public static String STREAM_NAME = "{stream.name}";
+    // Flink cluster url
+    public static String FLINK_URL = "{flink.cluster.url}";
+    // Pulsar cluster admin url
+    public static String PULSAR_ADMIN_URL = "{pulsar.admin.url}";
+    // Pulsar cluster service url
+    public static String PULSAR_SERVICE_URL = "{pulsar.service.url}";
+    // Pulsar tenant
+    public static String tenant = "{pulsar.tenant}";
+    // Pulsar topic
+    public static String topic = "{pulsar.topic}";
+
+    @Test
+    public void testCreateGroupForHive() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongStreamConf streamConf = createStreamConf();
+            InlongStreamBuilder streamBuilder = group.createStream(streamConf);
+            streamBuilder.fields(createStreamFields());
+            streamBuilder.source(createKafkaSource());
+            streamBuilder.sink(createHiveSink());
+            streamBuilder.initOrUpdate();
+            // start group
+            InlongGroupContext inlongGroupContext = group.init();
+            Assert.notNull(inlongGroupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testStopGroup() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(INLONG_AUTH);
+        InlongClient inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        InlongGroupConf groupConf = createGroupConf();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupConf);
+            InlongGroupContext groupContext = group.delete();
+            Assert.notNull(groupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private InlongGroupConf createGroupConf() {
+        InlongGroupConf inlongGroupConf = new InlongGroupConf();
+        inlongGroupConf.setGroupName(GROUP_NAME);
+        inlongGroupConf.setDescription(GROUP_NAME);
+        inlongGroupConf.setProxyClusterId(1);
+        //pulsar conf
+        PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
+        inlongGroupConf.setMqBaseConf(pulsarBaseConf);
+        pulsarBaseConf.setPulsarServiceUrl(PULSAR_SERVICE_URL);
+        pulsarBaseConf.setPulsarAdminUrl(PULSAR_ADMIN_URL);
+        pulsarBaseConf.setNamespace("public");
+        pulsarBaseConf.setEnableCreateResource(false);
+        pulsarBaseConf.setTenant(tenant);
+
+        //flink conf
+        FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
+        inlongGroupConf.setSortBaseConf(sortBaseConf);
+        sortBaseConf.setServiceUrl(FLINK_URL);
+        Map<String, String> map = new HashMap<>(16);
+        sortBaseConf.setProperties(map);
+        //enable zk
+        inlongGroupConf.setZookeeperEnabled(false);
+        inlongGroupConf.setDailyRecords(10000000L);
+        inlongGroupConf.setPeakRecords(100000L);
+        inlongGroupConf.setMaxLength(10000);
+        return inlongGroupConf;
+    }
+
+    private InlongStreamConf createStreamConf() {
+        InlongStreamConf streamConf = new InlongStreamConf();
+        streamConf.setName(STREAM_NAME);
+        streamConf.setDescription(STREAM_NAME);
+        streamConf.setCharset(StandardCharsets.UTF_8);
+        streamConf.setDataSeparator(DataSeparator.VERTICAL_BAR);
+        // true if you need strictly order for data
+        streamConf.setStrictlyOrdered(true);
+        streamConf.setTopic(topic);
+        return streamConf;
+    }
+
+    public KafkaSource createKafkaSource() {
+        KafkaSource kafkaSource = new KafkaSource();
+        kafkaSource.setBootstrapServers("{kafka.bootstrap}");
+        kafkaSource.setTopic("{kafka.topic}");
+        kafkaSource.setSourceName("{kafka.source.name}");
+        kafkaSource.setDataFormat(DataFormat.JSON);
+        return kafkaSource;
+    }
+
+    private HiveSink createHiveSink() {
+        HiveSink hiveSink = new HiveSink();
+        hiveSink.setDbName("{db.name}");
+        hiveSink.setJdbcUrl("jdbc:hive2://{ip:port}");
+        hiveSink.setAuthentication(new DefaultAuthentication("hive", "hive"));
+        hiveSink.setCharset(StandardCharsets.UTF_8);
+        hiveSink.setFileFormat(FileFormat.TextFile);
+        hiveSink.setDataSeparator(DataSeparator.VERTICAL_BAR);
+        hiveSink.setDataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}");
+
+        List<SinkField> fields = new ArrayList<>();
+        SinkField field1 = new SinkField(0, FieldType.INT, "age", FieldType.INT, "age");
+        SinkField field2 = new SinkField(1, FieldType.STRING, "name", FieldType.STRING, "name");
+        fields.add(field1);
+        fields.add(field2);
+        hiveSink.setSinkFields(fields);
+        hiveSink.setTableName("{table.name}");
+        hiveSink.setSinkName("{hive.sink.name}");
+        return hiveSink;
+    }
+
+    public List<StreamField> createStreamFields() {
+        List<StreamField> streamFieldList = Lists.newArrayList();
+        streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
+        streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
+        return streamFieldList;
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index 9ed71b3..acb8c71 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -36,6 +36,11 @@ public class SinkField extends StreamField {
     @ApiModelProperty("Source field type")
     private FieldType sourceFieldType;
 
+    public SinkField(int index, FieldType fieldType, String fieldName, FieldType sourceFieldType,
+            String sourceFieldName) {
+        this(index, fieldType, fieldName, null, null, sourceFieldName, sourceFieldType, 0, null);
+    }
+
     public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
             String fieldValue, String sourceFieldName, FieldType sourceFieldType,
             Integer isMetaField, String fieldFormat) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index ba08425..3f487d0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -30,6 +30,14 @@ import org.apache.inlong.manager.common.enums.FieldType;
 @ApiModel("Stream field configuration")
 public class StreamField {
 
+    public StreamField(int index, FieldType fieldType, String fieldName, String fieldComment, String fieldValue) {
+        this.id = index;
+        this.fieldType = fieldType;
+        this.fieldName = fieldName;
+        this.fieldComment = fieldComment;
+        this.fieldValue = fieldValue;
+    }
+
     @ApiModelProperty("Field index")
     private Integer id;
 
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index ac33d7e..94a1b0d 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -39,6 +39,7 @@
         <module>manager-docker</module>
         <module>manager-plugin-examples</module>
         <module>manager-client</module>
+        <module>manager-client-test</module>
     </modules>
 
     <properties>