You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 02:16:49 UTC

[inlong] branch master updated: [INLONG-4790][SDK] Upgrade DES encryption decryption to AES (#4795)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f9d75c0c [INLONG-4790][SDK] Upgrade DES encryption decryption to AES (#4795)
9f9d75c0c is described below

commit 9f9d75c0ce1bb40d36a482b8ff53d57ba1de7994
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Jun 30 10:16:44 2022 +0800

    [INLONG-4790][SDK] Upgrade DES encryption decryption to AES (#4795)
---
 .../sdk/dataproxy/codec/ProtocolEncoder.java       |  41 +++----
 .../sdk/dataproxy/config/EncryptConfigEntry.java   |  49 ++++----
 .../inlong/sdk/dataproxy/config/EncryptInfo.java   |  15 +--
 .../sdk/dataproxy/example/UdpClientExample.java    |  52 +++++----
 .../inlong/sdk/dataproxy/utils/EncryptUtil.java    | 129 +++++++--------------
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  15 +--
 .../inlong/sdk/dataproxy/EncryptUtilTest.java      |  42 +++++++
 7 files changed, 175 insertions(+), 168 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index b9b68a0d6..73f809015 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -18,22 +18,10 @@
 
 package org.apache.inlong.sdk.dataproxy.codec;
 
-import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_AUTH;
-import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
-import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageEncoder;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Iterator;
-
-import java.util.List;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
 import org.apache.inlong.sdk.dataproxy.network.Utils;
@@ -42,7 +30,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_AUTH;
+import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
+import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT;
+
 public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
+
     private static final Logger logger = LoggerFactory
             .getLogger(ProtocolEncoder.class);
 
@@ -119,7 +120,7 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
     }
 
     private ByteBuf constructBody(byte[] body, EncodeObject object,
-        int totalLength, int cnt) throws UnsupportedEncodingException {
+            int totalLength, int cnt) throws UnsupportedEncodingException {
         ByteBuf buf = null;
         if (body != null) {
             if (object.isCompress()) {
@@ -134,9 +135,9 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
                     }
                     EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
                     endAttr = endAttr + "_userName=" + object.getUserName()
-                        + "&_encyVersion=" + encryptInfo.getVersion()
-                        + "&_encyDesKey=" + encryptInfo.getRsaEncryptedKey();
-                    body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
+                            + "&_encyVersion=" + encryptInfo.getVersion()
+                            + "&_encyAesKey=" + encryptInfo.getRsaEncryptedKey();
+                    body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
                 }
             }
             if (!object.isGroupIdTransfer()) {
@@ -287,8 +288,8 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
                         EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
                         msgAttrs = msgAttrs + "_userName=" + object.getUserName()
                                 + "&_encyVersion=" + encryptInfo.getVersion()
-                                + "&_encyDesKey=" + encryptInfo.getRsaEncryptedKey();
-                        body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
+                                + "&_encyAesKey=" + encryptInfo.getRsaEncryptedKey();
+                        body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
                     }
                 }
                 if (Utils.isNotBlank(object.getMsgUUID())) {
@@ -377,8 +378,8 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
                         EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
                         msgAttrs = msgAttrs + "_userName=" + object.getUserName()
                                 + "&_encyVersion=" + encryptInfo.getVersion()
-                                + "&_encyDesKey=" + encryptInfo.getRsaEncryptedKey();
-                        body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
+                                + "&_encyAesKey=" + encryptInfo.getRsaEncryptedKey();
+                        body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
                     }
                 }
                 if (Utils.isNotBlank(object.getMsgUUID())) {
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
index 047ca52a9..1b1cfd0f3 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
@@ -18,24 +18,25 @@
 
 package org.apache.inlong.sdk.dataproxy.config;
 
-import java.net.URLEncoder;
-import java.security.interfaces.RSAPublicKey;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URLEncoder;
+import java.security.interfaces.RSAPublicKey;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Created by lamberliu on 2016/5/13.
  */
 public class EncryptConfigEntry implements java.io.Serializable {
+
     private static final Logger logger = LoggerFactory.getLogger(EncryptConfigEntry.class);
     private String userName = "";
     private String version;
     private String pubKey;
-    private byte[] desKey;
+    private byte[] aesKey;
     private String rsaEncryptedKey;
     private AtomicLong lastUpdateTime = new AtomicLong(0);
 
@@ -43,7 +44,7 @@ public class EncryptConfigEntry implements java.io.Serializable {
         this.userName = userName;
         this.version = version;
         this.pubKey = pubKey;
-        this.desKey = null;
+        this.aesKey = null;
         this.rsaEncryptedKey = null;
         // this.rsaKey = EncryptUtil.loadPublicKeyByText(pubKey);
     }
@@ -52,27 +53,35 @@ public class EncryptConfigEntry implements java.io.Serializable {
         return version;
     }
 
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
     public String getPubKey() {
         return pubKey;
     }
 
+    public void setPubKey(String pubKey) {
+        this.pubKey = pubKey;
+    }
+
     public String getUserName() {
         return userName;
     }
 
-    public synchronized byte[] getDesKey() {
-        if (desKey == null) {
-            desKey = EncryptUtil.generateDesKey();
+    public synchronized byte[] getAesKey() {
+        if (aesKey == null) {
+            aesKey = EncryptUtil.generateAesKey();
         }
 
-        return desKey;
+        return aesKey;
     }
 
     public String getRsaEncryptedKey() {
         if (rsaEncryptedKey == null) {
             RSAPublicKey rsaKey = EncryptUtil.loadPublicKeyByText(pubKey);
             try {
-                byte[] encryptedKey = EncryptUtil.rsaEncrypt(rsaKey, getDesKey());
+                byte[] encryptedKey = EncryptUtil.rsaEncrypt(rsaKey, getAesKey());
                 String tmpKey = Base64.encodeBase64String(encryptedKey);
                 rsaEncryptedKey = URLEncoder.encode(tmpKey, "utf8");
                 this.lastUpdateTime.set(System.currentTimeMillis());
@@ -90,7 +99,7 @@ public class EncryptConfigEntry implements java.io.Serializable {
         EncryptInfo encryptInfo = null;
         long visitTime = this.lastUpdateTime.get();
         if (rsaEncryptedKey != null && (System.currentTimeMillis() - visitTime) <= 3 * 60 * 1000) {
-            encryptInfo = new EncryptInfo(this.version, this.rsaEncryptedKey, this.desKey);
+            encryptInfo = new EncryptInfo(this.version, this.rsaEncryptedKey, this.aesKey);
             if (visitTime == this.lastUpdateTime.get()) {
                 return encryptInfo;
             }
@@ -99,20 +108,20 @@ public class EncryptConfigEntry implements java.io.Serializable {
         synchronized (this.lastUpdateTime) {
             if (visitTime == this.lastUpdateTime.get()) {
                 RSAPublicKey rsaKey = EncryptUtil.loadPublicKeyByText(pubKey);
-                this.desKey = EncryptUtil.generateDesKey();
+                this.aesKey = EncryptUtil.generateAesKey();
                 try {
-                    byte[] encryptedKey = EncryptUtil.rsaEncrypt(rsaKey, this.desKey);
+                    byte[] encryptedKey = EncryptUtil.rsaEncrypt(rsaKey, this.aesKey);
                     String tmpKey = Base64.encodeBase64String(encryptedKey);
                     rsaEncryptedKey = URLEncoder.encode(tmpKey, "utf8");
                     this.lastUpdateTime.set(System.currentTimeMillis());
-                    return new EncryptInfo(this.version, this.rsaEncryptedKey, this.desKey);
+                    return new EncryptInfo(this.version, this.rsaEncryptedKey, this.aesKey);
                 } catch (Throwable e) {
                     logger.error("getRsaEncryptInfo failure, RSA Encrypt error {}", e);
                     return null;
                 }
             }
         }
-        return new EncryptInfo(this.version, this.rsaEncryptedKey, this.desKey);
+        return new EncryptInfo(this.version, this.rsaEncryptedKey, this.aesKey);
     }
 
     @Override
@@ -129,14 +138,6 @@ public class EncryptConfigEntry implements java.io.Serializable {
                 && (this.pubKey == info.getPubKey());
     }
 
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public void setPubKey(String pubKey) {
-        this.pubKey = pubKey;
-    }
-
     public String toString() {
         return "{\"version\":\"" + version + "\",\"public_key\":\"" + pubKey + "\",\"groupId\":\"" + userName + "\"}";
     }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptInfo.java
index 24ab29d33..3504225de 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptInfo.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptInfo.java
@@ -19,14 +19,15 @@
 package org.apache.inlong.sdk.dataproxy.config;
 
 public class EncryptInfo {
+
     private String version;
-    private byte[] desKey;
+    private byte[] aesKey;
     private String rsaEncryptedKey;
 
-    public EncryptInfo(String version, String rsaEncryptedKey, byte[] desKey) {
+    public EncryptInfo(String version, String rsaEncryptedKey, byte[] aesKey) {
         this.version = version;
         this.rsaEncryptedKey = rsaEncryptedKey;
-        this.desKey = desKey;
+        this.aesKey = aesKey;
     }
 
     public String getVersion() {
@@ -37,12 +38,12 @@ public class EncryptInfo {
         this.version = version;
     }
 
-    public byte[] getDesKey() {
-        return desKey;
+    public byte[] getAesKey() {
+        return aesKey;
     }
 
-    public void setDesKey(byte[] desKey) {
-        this.desKey = desKey;
+    public void setAesKey(byte[] aesKey) {
+        this.aesKey = aesKey;
     }
 
     public String getRsaEncryptedKey() {
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 9a7522364..001fddbf4 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -18,9 +18,6 @@
 
 package org.apache.inlong.sdk.dataproxy.example;
 
-import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
-import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -31,16 +28,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.channel.socket.nio.NioDatagramChannel;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
-import java.io.ByteArrayOutputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Random;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
@@ -51,12 +38,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
+import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT;
+
 public class UdpClientExample {
 
     private static final Logger logger = LoggerFactory.getLogger(UdpClientExample.class);
 
     private static SequentialID idGenerator = new SequentialID(Utils.getLocalIp());
 
+    private static SecureRandom random = new SecureRandom();
+
     public static void main(String[] args) {
         long sentCount = 10;
         String groupId = "test_group_id";
@@ -92,6 +94,16 @@ public class UdpClientExample {
         }
     }
 
+    public static String getRandomString(int length) {
+        StringBuilder sb = new StringBuilder();
+        String string = "i am bus test client!";
+        for (int i = 0; i < length; i++) {
+            int number = random.nextInt(string.length());
+            sb.append(string.charAt(number));
+        }
+        return sb.toString();
+    }
+
     public boolean sendUdpMessage(Channel channel, String ip, int port, ByteBuf msg) {
         try {
             channel.writeAndFlush(new DatagramPacket(msg, new InetSocketAddress(ip, port))).sync();
@@ -187,9 +199,9 @@ public class UdpClientExample {
                         }
                         EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
                         endAttr = endAttr + "_userName=" + object.getUserName() + "&_encyVersion="
-                                + encryptInfo.getVersion() + "&_encyDesKey="
+                                + encryptInfo.getVersion() + "&_encyAesKey="
                                 + encryptInfo.getRsaEncryptedKey();
-                        body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
+                        body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
                     }
                 }
                 if (!object.isGroupIdTransfer()) {
@@ -281,14 +293,4 @@ public class UdpClientExample {
         }
         return channel;
     }
-
-    public static String getRandomString(int length) {
-        StringBuffer sb = new StringBuffer();
-        String string = "i am bus test client!";
-        for (int i = 0; i < length; i++) {
-            int number = new Random().nextInt(string.length());
-            sb.append(string.charAt(number));
-        }
-        return sb.toString();
-    }
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EncryptUtil.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EncryptUtil.java
index 87f475468..9846530cb 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EncryptUtil.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EncryptUtil.java
@@ -18,6 +18,17 @@
 
 package org.apache.inlong.sdk.dataproxy.utils;
 
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.KeyGenerator;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -29,39 +40,25 @@ import java.io.IOException;
 import java.security.InvalidKeyException;
 import java.security.KeyFactory;
 import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
 import java.security.interfaces.RSAPrivateKey;
 import java.security.interfaces.RSAPublicKey;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.PKCS8EncodedKeySpec;
 import java.security.spec.X509EncodedKeySpec;
-import javax.crypto.BadPaddingException;
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.KeyGenerator;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.SecretKey;
-import javax.crypto.SecretKeyFactory;
-import javax.crypto.spec.DESKeySpec;
-
-import org.apache.commons.codec.binary.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class EncryptUtil {
-    private static final Logger logger =
-            LoggerFactory.getLogger(EncryptUtil.class);
 
+    public static final String AES = "AES";
+    public static final int AES_KEY_SIZE = 128;
     public static final int MAX_ENCRYPT_BLOCK = 117;
-
     public static final int MAX_DECRYPT_BLOCK = 128;
-
-    public static final String DES = "DES";
+    private static final Logger logger = LoggerFactory.getLogger(EncryptUtil.class);
 
     /**
      * load key
      *
      * @param path path
+     *
      * @throws Exception exception
      */
     public static String loadPublicKeyByFileText(String path) throws Exception {
@@ -150,6 +147,7 @@ public class EncryptUtil {
      *
      * @param path key path
      * @return whether success
+     *
      * @throws Exception
      */
     public static String loadPrivateKeyByFileText(String path) throws Exception {
@@ -198,6 +196,7 @@ public class EncryptUtil {
      * load private key by text
      *
      * @param privateKeyStr private key
+     *
      * @throws Exception exception
      */
     public static RSAPrivateKey loadPrivateKeyByText(String privateKeyStr)
@@ -237,7 +236,7 @@ public class EncryptUtil {
      * key encrypt
      *
      * @param publicKey public key
-     * @param data      data
+     * @param data data
      * @return
      *
      * @throws Exception exception
@@ -270,9 +269,9 @@ public class EncryptUtil {
     }
 
     /**
-     * key encrypt
+     * key decrypt
      *
-     * @param privateKey    key
+     * @param privateKey key
      * @param encryptedData data
      * @return
      *
@@ -309,6 +308,7 @@ public class EncryptUtil {
      * @param privateKey private key
      * @param cipherData data
      * @return message
+     *
      * @throws Exception exception
      */
     public static byte[] rsaDecrypt(RSAPrivateKey privateKey, byte[] cipherData)
@@ -340,7 +340,7 @@ public class EncryptUtil {
     /**
      * rsa decrypt
      *
-     * @param publicKey  public key
+     * @param publicKey public key
      * @param cipherData cipher data
      * @return
      *
@@ -374,101 +374,60 @@ public class EncryptUtil {
     }
 
     /**
-     * generate des key
+     * generate AES key
      *
      * @return base64 key
      */
-    public static byte[] generateDesKey() {
+    public static byte[] generateAesKey() {
 
         KeyGenerator kg = null;
         try {
-            kg = KeyGenerator.getInstance("DES");
+            kg = KeyGenerator.getInstance(AES);
         } catch (NoSuchAlgorithmException e) {
-            logger.error("generate Des key error {}", e);
+            logger.error("generate Aes key error {}", e);
         }
 
-        kg.init(56);
+        kg.init(AES_KEY_SIZE);
 
         SecretKey secretKey = kg.generateKey();
         return secretKey.getEncoded();
     }
 
     /**
-     * des encrypt
+     * AES encrypt
      *
      * @param plainText
-     * @param desKey
+     * @param aesKey
      * @return
      */
-    public static byte[] desEncrypt(byte[] plainText, byte[] desKey) {
+    public static byte[] aesEncrypt(byte[] plainText, byte[] aesKey) {
         try {
-//            byte[] buffer = Base64.decodeBase64(DesKey);
-
-            SecureRandom sr = SecureRandom.getInstance("SHA1PRNG");
-
-            DESKeySpec dks = new DESKeySpec(desKey);
-
-            SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
-            SecretKey key = keyFactory.generateSecret(dks);
-
-            Cipher cipher = Cipher.getInstance(DES);
-            cipher.init(Cipher.ENCRYPT_MODE, key);
-
-            byte[] encryptedData = cipher.doFinal(plainText);
-
-            return encryptedData;
+            SecretKeySpec secretKeySpec = new SecretKeySpec(aesKey, AES);
+            Cipher cipher = Cipher.getInstance(AES);
+            cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec);
+            return cipher.doFinal(plainText);
         } catch (Exception e) {
-            logger.error("desEncrypt error {}", e);
+            logger.error("aesEncrypt error {}", e);
             return null;
         }
     }
 
     /**
-     * des decrypt
+     * AES decrypt
      *
-     * @param plainText
-     * @param desKey
+     * @param cipherText
+     * @param aesKey
      * @return des decrypt
      */
-    public static byte[] dESDecrypt(byte[] plainText, byte[] desKey) {
+    public static byte[] aesDecrypt(byte[] cipherText, byte[] aesKey) {
         try {
-            SecureRandom sr = SecureRandom.getInstance("SHA1PRNG");
-
-            DESKeySpec dks = new DESKeySpec(desKey);
-            SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
-            SecretKey key = keyFactory.generateSecret(dks);
-
-            Cipher cipher = Cipher.getInstance(DES);
-
-            cipher.init(Cipher.DECRYPT_MODE, key);
-
-            byte[] decryptedData = cipher.doFinal(plainText);
-
-//            System.out.println("decrypted data");
-//            System.out.println(new String(decryptedData));
-
-            return decryptedData;
+            SecretKeySpec secretKeySpec = new SecretKeySpec(aesKey, AES);
+            Cipher cipher = Cipher.getInstance(AES);
+            cipher.init(Cipher.DECRYPT_MODE, secretKeySpec);
+            return cipher.doFinal(cipherText);
         } catch (Exception e) {
-//            e.printStackTrace();
-            logger.error("dESDecrypt error {}", e);
+            logger.error("aesDecrypt error {}", e);
             return null;
         }
     }
-
-    public static void main(String[] args) {
-        String plainText = "TDB-30001 Create Tdw Table Error26880 FAILED: "
-                + "TDWServer run SQL error (session: 6425308280519064 query: CREATE TABLE a";
-        System.out.println("plainText: \n" + plainText);
-        byte[] key = new byte[0];
-        try {
-            key = generateDesKey();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        byte[] encryptedData = desEncrypt(plainText.getBytes(), key);
-        System.out.println("after encrypted: \n" + new String(encryptedData));
-        byte[] decryptedData = dESDecrypt(encryptedData, key);
-        System.out.println("after decrypted: \n" + new String(decryptedData));
-
-    }
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index e3c29ddde..2a46f0c8e 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -18,26 +18,27 @@
 
 package org.apache.inlong.sdk.dataproxy.utils;
 
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
 import java.util.Set;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ProxyUtils {
+
     private static final Logger logger = LoggerFactory.getLogger(ProxyUtils.class);
     private static final int TIME_LENGTH = 13;
     private static final Set<String> invalidAttr = new HashSet<>();
 
     static {
         Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
-            "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
-            "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyDesKey");
+                "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
+                "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey");
     }
 
     public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EncryptUtilTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EncryptUtilTest.java
new file mode 100644
index 000000000..79a1ec78b
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EncryptUtilTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class EncryptUtilTest {
+
+    @Test
+    public void testEncryptDecrypt() {
+        String plainText = "some arbitrary text to encrypt";
+
+        byte[] key = EncryptUtil.generateAesKey();
+        Assert.assertEquals(key.length, EncryptUtil.AES_KEY_SIZE / 8);
+
+        byte[] encryptedData = EncryptUtil.aesEncrypt(plainText.getBytes(StandardCharsets.UTF_8), key);
+        Assert.assertTrue(encryptedData.length > 0);
+
+        byte[] decryptedData = EncryptUtil.aesDecrypt(encryptedData, key);
+        Assert.assertArrayEquals(decryptedData, (plainText.getBytes(StandardCharsets.UTF_8)));
+    }
+}