You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/03 06:53:58 UTC

[inlong] branch master updated (d3286fa93 -> 22b159a45)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


 discard d3286fa93 [INLONG-4845][Dashboard] Update the Tube configuration form (#4846)
 discard 673187f78 [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster  (#4844)
    omit 051e83ced [INLONG-4774][Manager] Support auth and encryption key user config (#4775)
     new a2145ce4c [INLONG-4774][Manager] Support auth and encryption key user config (#4775)
     new f670f098d [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844)
     new 22b159a45 [INLONG-4845][Dashboard] Update the Tube configuration form (#4846)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d3286fa93)
            \
             N -- N -- N   refs/heads/master (22b159a45)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[inlong] 01/03: [INLONG-4774][Manager] Support auth and encryption key user config (#4775)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit a2145ce4c85142313637eef41559068f975cd5ce
Author: woofyzhao <49...@qq.com>
AuthorDate: Sat Jul 2 17:19:39 2022 +0800

    [INLONG-4774][Manager] Support auth and encryption key user config (#4775)
    
    * Support auth and encryption key user config
    
    * Support both encrypted and unencrypted data store
    
    * Update some classes name and property files
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/manager/common/pojo/user/UserInfo.java  |  12 +-
 .../inlong/manager/common/util/AESUtils.java       | 192 +++++++++++++++++++++
 .../inlong/manager/common/util/RSAUtils.java       | 122 +++++++++++++
 .../inlong/manager/common/util/AESUtilsTest.java   |  61 +++++++
 .../src/test/resources/application.properties      |  24 +++
 .../inlong/manager/dao/entity/UserEntity.java      |   7 +-
 .../main/resources/mappers/UserEntityMapper.xml    |  78 +++++++--
 .../manager/service/core/impl/UserServiceImpl.java |  48 +++++-
 .../main/resources/h2/apache_inlong_manager.sql    |  22 ++-
 .../manager-web/sql/apache_inlong_manager.sql      |  22 ++-
 .../src/main/resources/application.properties      |   4 +
 .../manager/web/controller/AnnoControllerTest.java |   5 +-
 .../resources/application.properties               |   4 +
 13 files changed, 555 insertions(+), 46 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/user/UserInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/user/UserInfo.java
index cc6f9dadc..d4d5ada15 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/user/UserInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/user/UserInfo.java
@@ -43,8 +43,7 @@ public class UserInfo {
     private Integer id;
 
     /**
-     * user type
-     * {@link UserTypeEnum}
+     * user type {@link UserTypeEnum}
      */
     @NotNull
     @InEnumInt(UserTypeEnum.class)
@@ -59,6 +58,15 @@ public class UserInfo {
     @ApiModelProperty(value = "password", required = true)
     private String password;
 
+    @ApiModelProperty("secret key")
+    private String secretKey;
+
+    @ApiModelProperty("public key")
+    private String publicKey;
+
+    @ApiModelProperty("private key")
+    private String privateKey;
+
     @NotNull
     @Min(1)
     @ApiModelProperty(value = "valid days", required = true)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/AESUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/AESUtils.java
new file mode 100644
index 000000000..a3676421f
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/AESUtils.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.SecureRandom;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * AES encryption and decryption utils.
+ */
+@Slf4j
+public class AESUtils {
+
+    private static final int DEFAULT_VERSION = 1;
+    private static final int KEY_SIZE = 128;
+    private static final String ALGORITHM = "AES";
+    private static final String RNG_ALGORITHM = "SHA1PRNG";
+
+    private static final String CONFIG_FILE = "application.properties";
+    private static final String CONFIG_ITEM_ENCRYPT_KEY_PREFIX = "inlong.encrypt.key.value";
+    private static final String CONFIG_ITEM_ENCRYPT_VERSION = "inlong.encrypt.version";
+
+    public static Map<Integer, String> AES_KEY_MAP = new ConcurrentHashMap<>();
+    public static Integer CURRENT_VERSION;
+
+    /**
+     * Load the application properties configuration
+     */
+    private static Properties getApplicationProperties() throws IOException {
+        Properties properties = new Properties();
+        String path = Thread.currentThread().getContextClassLoader().getResource("").getPath() + CONFIG_FILE;
+        InputStream inputStream = new BufferedInputStream(Files.newInputStream(Paths.get(path)));
+        properties.load(inputStream);
+        return properties;
+    }
+
+    /**
+     * Get the current aes key version
+     */
+    public static Integer getCurrentVersion(Properties properties) throws IOException {
+        if (CURRENT_VERSION != null) {
+            return CURRENT_VERSION;
+        }
+
+        if (properties == null) {
+            properties = getApplicationProperties();
+        }
+        String verStr = properties.getProperty(CONFIG_ITEM_ENCRYPT_VERSION);
+        if (StringUtils.isNotEmpty(verStr)) {
+            CURRENT_VERSION = Integer.valueOf(verStr);
+        } else {
+            CURRENT_VERSION = DEFAULT_VERSION;
+        }
+        log.debug("Crypto CURRENT_VERSION = {}", CURRENT_VERSION);
+        return CURRENT_VERSION;
+    }
+
+    /**
+     * Get aes key from config file
+     */
+    public static String getAesKeyByConfig(Integer version) throws Exception {
+        Properties properties = getApplicationProperties();
+        Integer targetVersion = (version == null ? getCurrentVersion(properties) : version);
+        if (StringUtils.isNotEmpty(AES_KEY_MAP.get(targetVersion))) {
+            return AES_KEY_MAP.get(targetVersion);
+        }
+
+        // get aes key under specified version
+        String keyName = CONFIG_ITEM_ENCRYPT_KEY_PREFIX + targetVersion;
+        String aesKey = properties.getProperty(keyName);
+        if (StringUtils.isEmpty(aesKey)) {
+            throw new RuntimeException(String.format("cannot find encryption key %s in application config", keyName));
+        }
+        AES_KEY_MAP.put(targetVersion, aesKey);
+        return aesKey;
+    }
+
+    /**
+     * Generate key
+     */
+    private static SecretKey generateKey(byte[] aesKey) throws Exception {
+        SecureRandom random = SecureRandom.getInstance(RNG_ALGORITHM);
+        random.setSeed(aesKey);
+        KeyGenerator gen = KeyGenerator.getInstance(ALGORITHM);
+        gen.init(KEY_SIZE, random);
+        return gen.generateKey();
+    }
+
+    /**
+     * Encrypt by key
+     */
+    public static byte[] encrypt(byte[] plainBytes, byte[] key) throws Exception {
+        SecretKey secKey = generateKey(key);
+        Cipher cipher = Cipher.getInstance(ALGORITHM);
+        cipher.init(Cipher.ENCRYPT_MODE, secKey);
+        return cipher.doFinal(plainBytes);
+    }
+
+    /**
+     * Encrypt by key and current version
+     */
+    public static String encryptToString(byte[] plainBytes, Integer version) throws Exception {
+        if (version == null) {
+            // no encryption
+            return new String(plainBytes, StandardCharsets.UTF_8);
+        }
+        byte[] keyBytes = getAesKeyByConfig(version).getBytes(StandardCharsets.UTF_8);
+        return parseByte2HexStr(encrypt(plainBytes, keyBytes));
+    }
+
+    /**
+     * Decrypt by key and specified version
+     */
+    public static byte[] decrypt(byte[] cipherBytes, byte[] key) throws Exception {
+        SecretKey secKey = generateKey(key);
+        Cipher cipher = Cipher.getInstance(ALGORITHM);
+        cipher.init(Cipher.DECRYPT_MODE, secKey);
+        return cipher.doFinal(cipherBytes);
+    }
+
+    /**
+     * Encrypt by property key
+     */
+    public static byte[] decryptAsString(String cipherText, Integer version) throws Exception {
+        if (version == null) {
+            // No decryption: treated as plain text
+            return cipherText.getBytes(StandardCharsets.UTF_8);
+        }
+        byte[] keyBytes = getAesKeyByConfig(version).getBytes(StandardCharsets.UTF_8);
+        return decrypt(parseHexStr2Byte(cipherText), keyBytes);
+    }
+
+    /**
+     * Parse byte to String in Hex type
+     */
+    public static String parseByte2HexStr(byte[] buf) {
+        StringBuilder strBuf = new StringBuilder();
+        for (byte b : buf) {
+            String hex = Integer.toHexString(b & 0xFF);
+            if (hex.length() == 1) {
+                hex = '0' + hex;
+            }
+            strBuf.append(hex.toUpperCase());
+        }
+        return strBuf.toString();
+    }
+
+    /**
+     * Parse String to byte as Hex type
+     */
+    public static byte[] parseHexStr2Byte(String hexStr) {
+        if (hexStr.length() < 1) {
+            return null;
+        }
+        byte[] result = new byte[hexStr.length() / 2];
+        for (int i = 0; i < hexStr.length() / 2; i++) {
+            int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16);
+            int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16);
+            result[i] = (byte) (high * 16 + low);
+        }
+        return result;
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/RSAUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/RSAUtils.java
new file mode 100644
index 000000000..cf3bf9635
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/RSAUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import org.apache.commons.codec.binary.Base64;
+
+import javax.crypto.Cipher;
+import java.io.ByteArrayOutputStream;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.interfaces.RSAPublicKey;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * RSA encryption and decryption utils.
+ */
+public class RSAUtils {
+
+    public static final String PUBLIC_KEY = "RSAPublicKey";
+    public static final String PRIVATE_KEY = "RSAPrivateKey";
+    private static final String KEY_ALGORITHM = "RSA";
+    private static final String SIGNATURE_ALGORITHM = "SHA1PRNG";
+    private static final int MAX_ENCRYPT_BLOCK = 117;
+    private static final int MAX_DECRYPT_BLOCK = 128;
+    private static SecureRandom random;
+
+    static {
+        try {
+            random = SecureRandom.getInstance(SIGNATURE_ALGORITHM);
+        } catch (NoSuchAlgorithmException ignored) {
+            // impossible
+        }
+    }
+
+    /**
+     * Generate RSA key pairs
+     */
+    public static Map<String, String> generateRSAKeyPairs() throws NoSuchAlgorithmException {
+        Map<String, String> keyPairMap = new HashMap<>();
+        KeyPairGenerator generator = KeyPairGenerator.getInstance(KEY_ALGORITHM);
+        generator.initialize(1024, random);
+        KeyPair keyPair = generator.genKeyPair();
+        RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
+        RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
+        keyPairMap.put(PUBLIC_KEY, Base64.encodeBase64String(publicKey.getEncoded()));
+        keyPairMap.put(PRIVATE_KEY, Base64.encodeBase64String(privateKey.getEncoded()));
+        return keyPairMap;
+    }
+
+    /**
+     * Encryption by public key
+     */
+    public static byte[] encryptByPublicKey(byte[] data, RSAPublicKey publicKey) throws Exception {
+        Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
+        cipher.init(Cipher.ENCRYPT_MODE, publicKey);
+        int inputLen = data.length;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // encrypt data by block
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_ENCRYPT_BLOCK) {
+                cache = cipher.doFinal(data, offSet, MAX_ENCRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(data, offSet, inputLen - offSet);
+            }
+            out.write(cache, 0, cache.length);
+            i++;
+            offSet = i * MAX_ENCRYPT_BLOCK;
+        }
+        byte[] encryptedData = out.toByteArray();
+        out.close();
+        return encryptedData;
+    }
+
+    /**
+     * Decryption by private key
+     */
+    public static byte[] decryptByPrivateKey(byte[] encryptedData, RSAPrivateKey privateKey) throws Exception {
+        Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
+        cipher.init(Cipher.DECRYPT_MODE, privateKey);
+        int inputLen = encryptedData.length;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // decrypt data by block
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_DECRYPT_BLOCK) {
+                cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);
+            }
+            out.write(cache, 0, cache.length);
+            i++;
+            offSet = i * MAX_DECRYPT_BLOCK;
+        }
+        byte[] decryptedData = out.toByteArray();
+        out.close();
+        return decryptedData;
+    }
+}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/util/AESUtilsTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/util/AESUtilsTest.java
new file mode 100644
index 000000000..82388272e
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/util/AESUtilsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * AES encryption and decryption util test.
+ */
+public class AESUtilsTest {
+
+    @Test
+    public void testEncryptDecryptDirectly() throws Exception {
+        byte[] key = "key-123".getBytes(StandardCharsets.UTF_8);
+        String plainText = "hello, inlong";
+        byte[] cipheredBytes = AESUtils.encrypt(plainText.getBytes(StandardCharsets.UTF_8), key);
+        byte[] decipheredBytes = AESUtils.decrypt(cipheredBytes, key);
+        Assertions.assertEquals(plainText, new String(decipheredBytes, StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testEncryptDecryptByConfigVersion() throws Exception {
+        String plainText = "hello, inlong again";
+        Integer version = AESUtils.getCurrentVersion(null);
+        String cipheredText = AESUtils.encryptToString(plainText.getBytes(StandardCharsets.UTF_8), version);
+        byte[] decipheredBytes = AESUtils.decryptAsString(cipheredText, version);
+        Assertions.assertEquals(plainText, new String(decipheredBytes, StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testEncryptDecryptByNullVersion() throws Exception {
+        String plainText = "hello, inlong again";
+
+        // when version is null no encryption is performed
+        String cipheredText = AESUtils.encryptToString(plainText.getBytes(StandardCharsets.UTF_8), null);
+        Assertions.assertEquals(plainText, cipheredText);
+
+        // when version is null no decryption is performed
+        byte[] decipheredBytes = AESUtils.decryptAsString(cipheredText, null);
+        Assertions.assertEquals(plainText, new String(decipheredBytes, StandardCharsets.UTF_8));
+    }
+}
diff --git a/inlong-manager/manager-common/src/test/resources/application.properties b/inlong-manager/manager-common/src/test/resources/application.properties
new file mode 100644
index 000000000..0481c7a54
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/resources/application.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Configure auth plugin
+#inlong.auth.type=default
+# Encryption config, the suffix of value must be the same as the version.
+inlong.encrypt.version=1
+inlong.encrypt.key.value1="I!N@L#O$N%G^"
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/UserEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/UserEntity.java
index 52ccba49d..b4c6d181a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/UserEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/UserEntity.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.manager.dao.entity;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.Date;
-import lombok.Data;
 
 /**
  * User entity, including username, password, etc.
@@ -31,6 +32,10 @@ public class UserEntity implements Serializable {
     private Integer id;
     private String name;
     private String password;
+    private String secretKey;
+    private String publicKey;
+    private String privateKey;
+    private Integer encryptVersion;
     private Integer accountType;
     private Date dueDate;
     private Date createTime;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/UserEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/UserEntityMapper.xml
index 3ecda201f..84c3e3b1d 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/UserEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/UserEntityMapper.xml
@@ -24,6 +24,10 @@
         <id column="id" jdbcType="INTEGER" property="id"/>
         <result column="name" jdbcType="VARCHAR" property="name"/>
         <result column="password" jdbcType="VARCHAR" property="password"/>
+        <result column="secret_key" jdbcType="VARCHAR" property="secretKey"/>
+        <result column="public_key" jdbcType="VARCHAR" property="publicKey"/>
+        <result column="private_key" jdbcType="VARCHAR" property="privateKey"/>
+        <result column="encrypt_version" jdbcType="INTEGER" property="encryptVersion"/>
         <result column="account_type" jdbcType="INTEGER" property="accountType"/>
         <result column="due_date" jdbcType="TIMESTAMP" property="dueDate"/>
         <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
@@ -62,8 +66,8 @@
         </where>
     </sql>
     <sql id="Base_Column_List">
-        id, name, password, account_type, due_date, create_time, update_time, create_by,
-    update_by
+        id, name, password, secret_key, public_key, private_key, encrypt_version,
+        account_type, due_date, create_time, update_time, create_by, update_by
     </sql>
     <select id="selectByExample" parameterType="org.apache.inlong.manager.dao.entity.UserEntityExample"
             resultMap="BaseResultMap">
@@ -93,11 +97,15 @@
     </delete>
     <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.UserEntity">
         insert into user (id, name, password,
-                          account_type, due_date, create_time,
-                          update_time, create_by, update_by)
+                          secret_key, public_key, private_key,
+                          encrypt_version, account_type, due_date,
+                          create_time, update_time,
+                          create_by, update_by)
         values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
-                #{accountType,jdbcType=INTEGER}, #{dueDate,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
-                #{updateTime,jdbcType=TIMESTAMP}, #{createBy,jdbcType=VARCHAR}, #{updateBy,jdbcType=VARCHAR})
+                #{secretKey,jdbcType=VARCHAR}, #{publicKey,jdbcType=VARCHAR}, #{privateKey,jdbcType=VARCHAR},
+                #{encryptVersion,jdbcType=INTEGER}, #{accountType,jdbcType=INTEGER}, #{dueDate,jdbcType=TIMESTAMP},
+                #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}, #{createBy,jdbcType=VARCHAR},
+                #{updateBy,jdbcType=VARCHAR})
     </insert>
     <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.UserEntity">
         insert into user
@@ -111,6 +119,18 @@
             <if test="password != null">
                 password,
             </if>
+            <if test="secretKey != null">
+                secret_key,
+            </if>
+            <if test="publicKey != null">
+                public_key,
+            </if>
+            <if test="privateKey != null">
+                private_key,
+            </if>
+            <if test="encryptVersion != null">
+                encrypt_version,
+            </if>
             <if test="accountType != null">
                 account_type,
             </if>
@@ -140,6 +160,18 @@
             <if test="password != null">
                 #{password,jdbcType=VARCHAR},
             </if>
+            <if test="secretKey != null">
+                #{secretKey,jdbcType=VARCHAR},
+            </if>
+            <if test="publicKey != null">
+                #{publicKey,jdbcType=VARCHAR},
+            </if>
+            <if test="privateKey != null">
+                #{privateKey,jdbcType=VARCHAR},
+            </if>
+            <if test="encryptVersion != null">
+                #{encryptVersion,jdbcType=INTEGER},
+            </if>
             <if test="accountType != null">
                 #{accountType,jdbcType=INTEGER},
             </if>
@@ -176,6 +208,18 @@
             <if test="password != null">
                 password = #{password,jdbcType=VARCHAR},
             </if>
+            <if test="secretKey != null">
+                secret_key = #{secretKey,jdbcType=VARCHAR},
+            </if>
+            <if test="publicKey != null">
+                public_key = #{publicKey,jdbcType=VARCHAR},
+            </if>
+            <if test="privateKey != null">
+                private_key = #{privateKey,jdbcType=VARCHAR},
+            </if>
+            <if test="encryptVersion != null">
+                encrypt_version = #{encryptVersion,jdbcType=INTEGER},
+            </if>
             <if test="accountType != null">
                 account_type = #{accountType,jdbcType=INTEGER},
             </if>
@@ -199,14 +243,18 @@
     </update>
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.UserEntity">
         update user
-        set name         = #{name,jdbcType=VARCHAR},
-            password     = #{password,jdbcType=VARCHAR},
-            account_type = #{accountType,jdbcType=INTEGER},
-            due_date     = #{dueDate,jdbcType=TIMESTAMP},
-            create_time  = #{createTime,jdbcType=TIMESTAMP},
-            update_time  = #{updateTime,jdbcType=TIMESTAMP},
-            create_by    = #{createBy,jdbcType=VARCHAR},
-            update_by    = #{updateBy,jdbcType=VARCHAR}
+        set name            = #{name,jdbcType=VARCHAR},
+            password        = #{password,jdbcType=VARCHAR},
+            secret_key      = #{secretKey,jdbcType=VARCHAR},
+            public_key      = #{publicKey,jdbcType=VARCHAR},
+            private_key     = #{privateKey,jdbcType=VARCHAR},
+            encrypt_version = #{encryptVersion,jdbcType=INTEGER},
+            account_type    = #{accountType,jdbcType=INTEGER},
+            due_date        = #{dueDate,jdbcType=TIMESTAMP},
+            create_time     = #{createTime,jdbcType=TIMESTAMP},
+            update_time     = #{updateTime,jdbcType=TIMESTAMP},
+            create_by       = #{createBy,jdbcType=VARCHAR},
+            update_by       = #{updateBy,jdbcType=VARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/UserServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/UserServiceImpl.java
index 727b0617e..46916d48b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/UserServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/UserServiceImpl.java
@@ -21,14 +21,18 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.user.PasswordChangeRequest;
 import org.apache.inlong.manager.common.pojo.user.UserDetailListVO;
 import org.apache.inlong.manager.common.pojo.user.UserDetailPageRequest;
 import org.apache.inlong.manager.common.pojo.user.UserInfo;
+import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.RSAUtils;
 import org.apache.inlong.manager.common.util.SmallTools;
 import org.apache.inlong.manager.dao.entity.UserEntity;
 import org.apache.inlong.manager.dao.entity.UserEntityExample;
@@ -38,8 +42,10 @@ import org.apache.inlong.manager.service.core.UserService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.inlong.manager.common.util.SmallTools.getOverDueDate;
 
@@ -64,7 +70,6 @@ public class UserServiceImpl implements UserService {
     @Override
     public UserInfo getById(Integer userId) {
         Preconditions.checkNotNull(userId, "User id should not be empty");
-
         UserEntity entity = userMapper.selectByPrimaryKey(userId);
         Preconditions.checkNotNull(entity, "User not exists with id " + userId);
 
@@ -74,6 +79,20 @@ public class UserServiceImpl implements UserService {
         result.setValidDays(SmallTools.getValidDays(entity.getCreateTime(), entity.getDueDate()));
         result.setType(entity.getAccountType());
 
+        try {
+            // decipher according to stored key version
+            // note that if the version is null then the string is treated as unencrypted plain text
+            Integer version = entity.getEncryptVersion();
+            byte[] secretKeyBytes = AESUtils.decryptAsString(entity.getSecretKey(), version);
+            byte[] publicKeyBytes = AESUtils.decryptAsString(entity.getPublicKey(), version);
+            result.setSecretKey(new String(secretKeyBytes, StandardCharsets.UTF_8));
+            result.setPublicKey(new String(publicKeyBytes, StandardCharsets.UTF_8));
+        } catch (Exception e) {
+            String errMsg = String.format("decryption error: %s", e.getMessage());
+            log.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+
         log.debug("success to get user info by id={}", userId);
         return result;
     }
@@ -82,7 +101,7 @@ public class UserServiceImpl implements UserService {
     public boolean create(UserInfo userInfo) {
         String username = userInfo.getUsername();
         UserEntity userExists = getByName(username);
-        Preconditions.checkNull(userExists, "User [" + username + "] already exists");
+        Preconditions.checkNull(userExists, "username [" + username + "] already exists");
 
         UserEntity entity = new UserEntity();
         entity.setAccountType(userInfo.getType());
@@ -90,6 +109,22 @@ public class UserServiceImpl implements UserService {
         entity.setDueDate(getOverDueDate(userInfo.getValidDays()));
         entity.setCreateBy(LoginUserUtils.getLoginUserDetail().getUsername());
         entity.setName(username);
+        try {
+            Map<String, String> keyPairs = RSAUtils.generateRSAKeyPairs();
+            String publicKey = keyPairs.get(RSAUtils.PUBLIC_KEY);
+            String privateKey = keyPairs.get(RSAUtils.PRIVATE_KEY);
+            String secretKey = RandomStringUtils.randomAlphanumeric(8);
+            Integer encryptVersion = AESUtils.getCurrentVersion(null);
+            entity.setEncryptVersion(encryptVersion);
+            entity.setPublicKey(AESUtils.encryptToString(publicKey.getBytes(StandardCharsets.UTF_8), encryptVersion));
+            entity.setPrivateKey(AESUtils.encryptToString(privateKey.getBytes(StandardCharsets.UTF_8), encryptVersion));
+            entity.setSecretKey(AESUtils.encryptToString(secretKey.getBytes(StandardCharsets.UTF_8), encryptVersion));
+        } catch (Exception e) {
+            String errMsg = String.format("generate rsa key error: %s", e.getMessage());
+            log.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+
         entity.setCreateTime(new Date());
         Preconditions.checkTrue(userMapper.insert(entity) > 0, "Create user failed");
 
@@ -99,13 +134,13 @@ public class UserServiceImpl implements UserService {
 
     @Override
     public int update(UserInfo userInfo, String currentUser) {
-        Preconditions.checkNotNull(userInfo, "User info should not be null");
-        Preconditions.checkNotNull(userInfo.getId(), "User id should not be null");
+        Preconditions.checkNotNull(userInfo, "user info should not be null");
+        Preconditions.checkNotNull(userInfo.getId(), "user id should not be null");
 
         // Whether the current user is an administrator
         UserEntity currentUserEntity = getByName(currentUser);
         Preconditions.checkTrue(currentUserEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
-                "The current user is not a manager and does not have permission to update users");
+                "current user is not a manager and does not have permission to update users");
 
         UserEntity entity = userMapper.selectByPrimaryKey(userInfo.getId());
         Preconditions.checkNotNull(entity, "User not exists with id " + userInfo.getId());
@@ -128,7 +163,6 @@ public class UserServiceImpl implements UserService {
         String oldPassword = request.getOldPassword();
         String oldPasswordMd = SmallTools.passwordMd5(oldPassword);
         Preconditions.checkTrue(oldPasswordMd.equals(entity.getPassword()), "Old password is wrong");
-
         String newPasswordMd5 = SmallTools.passwordMd5(request.getNewPassword());
         entity.setPassword(newPasswordMd5);
 
@@ -143,7 +177,7 @@ public class UserServiceImpl implements UserService {
         // Whether the current user is an administrator
         UserEntity entity = getByName(currentUser);
         Preconditions.checkTrue(entity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
-                "The current user is not a manager and does not have permission to delete users");
+                "current user is not a manager and does not have permission to delete users");
 
         userMapper.deleteByPrimaryKey(userId);
         log.debug("success to delete user by id={}, current user={}", userId, currentUser);
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 23491c6bc..d91c8be18 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -558,15 +558,19 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `user`
 (
-    `id`           int(11)      NOT NULL AUTO_INCREMENT,
-    `name`         varchar(256) NOT NULL COMMENT 'account name',
-    `password`     varchar(64)  NOT NULL COMMENT 'password md5',
-    `account_type` int(11)      NOT NULL DEFAULT '1' COMMENT 'account type, 0-manager 1-normal',
-    `due_date`     datetime              DEFAULT NULL COMMENT 'due date for account',
-    `create_time`  datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `update_time`  datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
-    `create_by`    varchar(256) NOT NULL COMMENT 'create by sb.',
-    `update_by`    varchar(256)          DEFAULT NULL COMMENT 'update by sb.',
+    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `name`            varchar(256) NOT NULL COMMENT 'Username',
+    `password`        varchar(64)  NOT NULL COMMENT 'Password md5',
+    `secret_key`      varchar(256)          DEFAULT NULL COMMENT 'Auth key for public network access',
+    `public_key`      text                  DEFAULT NULL COMMENT 'Public key for asymmetric data encryption',
+    `private_key`     text                  DEFAULT NULL COMMENT 'Private key for asymmetric data encryption',
+    `encrypt_version` int(11)               DEFAULT NULL COMMENT 'Encryption key version',
+    `account_type`    int(11)      NOT NULL DEFAULT '1' COMMENT 'Account type, 0-manager 1-normal',
+    `due_date`        datetime              DEFAULT NULL COMMENT 'Due date for account',
+    `create_by`       varchar(256) NOT NULL COMMENT 'Creator name',
+    `update_by`       varchar(256)          DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`     datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `update_time`     datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_user_name` (`name`)
 );
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index d5a94d94f..295f62081 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -587,15 +587,19 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `user`
 (
-    `id`           int(11)      NOT NULL AUTO_INCREMENT,
-    `name`         varchar(256) NOT NULL COMMENT 'account name',
-    `password`     varchar(64)  NOT NULL COMMENT 'password md5',
-    `account_type` int(11)      NOT NULL DEFAULT '1' COMMENT 'account type, 0-manager 1-normal',
-    `due_date`     datetime              DEFAULT NULL COMMENT 'due date for account',
-    `create_time`  datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `update_time`  datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
-    `create_by`    varchar(256) NOT NULL COMMENT 'create by sb.',
-    `update_by`    varchar(256)          DEFAULT NULL COMMENT 'update by sb.',
+    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `name`            varchar(256) NOT NULL COMMENT 'Username',
+    `password`        varchar(64)  NOT NULL COMMENT 'Password md5',
+    `secret_key`      varchar(256)          DEFAULT NULL COMMENT 'Auth key for public network access',
+    `public_key`      text                  DEFAULT NULL COMMENT 'Public key for asymmetric data encryption',
+    `private_key`     text                  DEFAULT NULL COMMENT 'Private key for asymmetric data encryption',
+    `encrypt_version` int(11)               DEFAULT NULL COMMENT 'Encryption key version',
+    `account_type`    int(11)      NOT NULL DEFAULT '1' COMMENT 'Account type, 0-manager 1-normal',
+    `due_date`        datetime              DEFAULT NULL COMMENT 'Due date for account',
+    `create_by`       varchar(256) NOT NULL COMMENT 'Creator name',
+    `update_by`       varchar(256)          DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`     datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `update_time`     datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_user_name` (`name`)
 ) ENGINE = InnoDB
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties
index 6d01dc8cf..a05144673 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -54,3 +54,7 @@ common.http-client.connectionRequestTimeout=3000
 
 # Configure auth plugin
 inlong.auth.type=default
+
+# Encryption config, the suffix of value must be the same as the version.
+inlong.encrypt.version=1
+inlong.encrypt.key.value1="I!N@L#O$N%G^"
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/AnnoControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/AnnoControllerTest.java
index 4983fd76b..a779ba72c 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/AnnoControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/AnnoControllerTest.java
@@ -70,8 +70,7 @@ class AnnoControllerTest extends WebBaseTest {
 
         Response<String> response = getResBody(mvcResult, String.class);
         Assertions.assertFalse(response.isSuccess());
-        Assertions.assertEquals("Username or password was incorrect, or the account has expired",
-                response.getErrMsg());
+        Assertions.assertTrue(response.getErrMsg().contains("incorrect"));
     }
 
     @Test
@@ -117,7 +116,7 @@ class AnnoControllerTest extends WebBaseTest {
 
         Response<Boolean> resBody = getResBody(mvcResult, Boolean.class);
         Assertions.assertFalse(resBody.isSuccess());
-        Assertions.assertEquals("User [admin] already exists", resBody.getErrMsg());
+        Assertions.assertTrue(resBody.getErrMsg().contains("already exists"));
     }
 
     @Test
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/test/resources/application.properties
similarity index 93%
copy from inlong-manager/manager-web/src/main/resources/application.properties
copy to inlong-manager/manager-web/src/test/resources/application.properties
index 6d01dc8cf..a05144673 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/test/resources/application.properties
@@ -54,3 +54,7 @@ common.http-client.connectionRequestTimeout=3000
 
 # Configure auth plugin
 inlong.auth.type=default
+
+# Encryption config, the suffix of value must be the same as the version.
+inlong.encrypt.version=1
+inlong.encrypt.key.value1="I!N@L#O$N%G^"


[inlong] 03/03: [INLONG-4845][Dashboard] Update the Tube configuration form (#4846)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 22b159a452f54b0bd5a5d3e7b38a7c821f166e11
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Sun Jul 3 14:43:47 2022 +0800

    [INLONG-4845][Dashboard] Update the Tube configuration form (#4846)
---
 inlong-dashboard/src/pages/Clusters/config/TubeMQ.tsx | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/inlong-dashboard/src/pages/Clusters/config/TubeMQ.tsx b/inlong-dashboard/src/pages/Clusters/config/TubeMQ.tsx
index 103d5adda..81ff8e31e 100644
--- a/inlong-dashboard/src/pages/Clusters/config/TubeMQ.tsx
+++ b/inlong-dashboard/src/pages/Clusters/config/TubeMQ.tsx
@@ -22,13 +22,28 @@ import type { ClsConfigItemType } from './types';
 export const TubeMQ: ClsConfigItemType[] = [
   {
     type: 'input',
-    label: 'MasterUrl',
+    label: 'MasterWebUrl',
+    name: 'masterWebUrl',
+    rules: [{ required: true }],
+    props: {
+      placeholder: 'http://127.0.0.1:8080',
+    },
+  },
+  {
+    type: 'input',
+    label: 'MasterRpcUrl',
     name: 'url',
     rules: [{ required: true }],
+    props: {
+      placeholder: '127.0.0.1:8715,127.0.1.1:8715',
+    },
   },
   {
     type: 'input',
     label: 'Token',
     name: 'token',
+    props: {
+      placeholder: 'Tube cluster token example: abc',
+    },
   },
 ];


[inlong] 02/03: [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f670f098d5dda1600c2fabca5b33f2b7aceffd99
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Sun Jul 3 14:18:28 2022 +0800

    [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844)
---
 .../common/pojo/cluster/tube/TubeClusterDTO.java   | 61 ++++++++++++++++++++++
 .../common/pojo/cluster/tube/TubeClusterInfo.java  |  4 +-
 .../pojo/cluster/tube/TubeClusterRequest.java      |  4 ++
 .../service/cluster/TubeClusterOperator.java       | 28 +++++++++-
 .../manager/service/mq/util/TubeMQOperator.java    |  4 +-
 5 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
new file mode 100644
index 000000000..b3bde1042
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.manager.common.pojo.cluster.tube;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Tube cluster info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Tube cluster info")
+public class TubeClusterDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+    @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+    private String masterWebUrl;
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static TubeClusterDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
index 968a0de54..f32e89040 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.common.pojo.cluster.tube;
 
 import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -36,7 +37,8 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ApiModel("Inlong cluster info for Tube")
 public class TubeClusterInfo extends ClusterInfo {
 
-    // no fields
+    @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+    private String masterWebUrl;
 
     public TubeClusterInfo() {
         this.setType(ClusterType.TUBE);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
index 613905519..f8b3002d1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.common.pojo.cluster.tube;
 
 import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -35,6 +36,9 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ApiModel("Inlong cluster request for Tube")
 public class TubeClusterRequest extends ClusterRequest {
 
+    @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+    private String masterWebUrl;
+
     // no field
 
     public TubeClusterRequest() {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 2a7e01c48..cb6eed3bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -17,17 +17,22 @@
 
 package org.apache.inlong.manager.service.cluster;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterDTO;
 import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.service.group.InlongNoneMqOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
@@ -38,6 +43,9 @@ public class TubeClusterOperator extends AbstractClusterOperator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongNoneMqOperator.class);
 
+    @Autowired
+    private ObjectMapper objectMapper;
+
     @Override
     public Boolean accept(String clusterType) {
         return getClusterType().equals(clusterType);
@@ -50,7 +58,15 @@ public class TubeClusterOperator extends AbstractClusterOperator {
 
     @Override
     protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) {
-        LOGGER.info("do nothing for tube cluster in set target entity");
+            TubeClusterRequest tubeRequest = (TubeClusterRequest) request;
+            CommonBeanUtils.copyProperties(tubeRequest, targetEntity, true);
+            try {
+                TubeClusterDTO dto = objectMapper.convertValue(tubeRequest, TubeClusterDTO.class);
+                targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+                LOGGER.info("success to set entity for tube cluster");
+            } catch (Exception e) {
+                throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            }
     }
 
     @Override
@@ -58,7 +74,15 @@ public class TubeClusterOperator extends AbstractClusterOperator {
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
         }
-        return CommonBeanUtils.copyProperties(entity, TubeClusterInfo::new);
+        TubeClusterInfo tubeClusterInfo = new TubeClusterInfo();
+        CommonBeanUtils.copyProperties(entity, tubeClusterInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            TubeClusterDTO dto = TubeClusterDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, tubeClusterInfo);
+        }
+
+        LOGGER.info("success to get tube cluster info from entity");
+        return tubeClusterInfo;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
index 0f68ba995..1fa9e489a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
@@ -69,7 +69,7 @@ public class TubeMQOperator {
      * Create topic for the given tube cluster.
      */
     public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
-        String masterUrl = tubeCluster.getUrl();
+        String masterUrl = tubeCluster.getMasterWebUrl();
         LOGGER.info("begin to create tube topic {} in master {}", topicName, masterUrl);
         if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
             throw new BusinessException("tube master url or tube topic cannot be null");
@@ -88,7 +88,7 @@ public class TubeMQOperator {
      * Create consumer group for the given tube topic and cluster.
      */
     public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
-        String masterUrl = tubeCluster.getUrl();
+        String masterUrl = tubeCluster.getMasterWebUrl();
         LOGGER.info("begin to create consumer group {} for topic {} in master {}", consumerGroup, topic, masterUrl);
         if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
             throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");