You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 07:43:14 UTC

[incubator-inlong] branch master updated: [INLONG-1741] [dataproxy] Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e8e64ed  [INLONG-1741] [dataproxy] Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)
e8e64ed is described below

commit e8e64ed35eb84511bb9d7b49d1e42820660db609
Author: dockerzhang <do...@apache.org>
AuthorDate: Thu Nov 4 15:43:09 2021 +0800

    [INLONG-1741] [dataproxy] Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)
    
    Co-authored-by: dockerzhang <do...@tencent.com>
---
 .../inlong/commons/msg/AttributeConstants.java     |   8 +-
 .../java/org/apache/inlong/commons/msg/TDMsg1.java |  26 +--
 .../inlong/dataproxy/DefaultMessageSender.java     | 213 +++++++++++----------
 .../org/apache/inlong/dataproxy/MessageSender.java |  28 +--
 .../apache/inlong/dataproxy/ProxyClientConfig.java |  50 ++---
 .../inlong/dataproxy/codec/EncodeObject.java       |  99 +++++-----
 .../inlong/dataproxy/codec/ProtocolEncoder.java    |  10 +-
 .../dataproxy/config/EncryptConfigEntry.java       |   2 +-
 .../inlong/dataproxy/config/ProxyConfigEntry.java  |  33 ++--
 .../dataproxy/config/ProxyConfigManager.java       |  60 +++---
 .../org/apache/inlong/dataproxy/demo/Event.java    |  32 ++--
 .../inlong/dataproxy/http/InternalHttpSender.java  |  30 +--
 .../inlong/dataproxy/metric/MessageRecord.java     |  19 +-
 .../apache/inlong/dataproxy/network/ClientMgr.java |  40 ++--
 .../inlong/dataproxy/network/HttpMessage.java      |  18 +-
 .../inlong/dataproxy/network/HttpProxySender.java  |  40 ++--
 .../apache/inlong/dataproxy/network/Sender.java    |  88 ++++-----
 .../dataproxy/threads/MetricWorkerThread.java      |  35 ++--
 .../apache/inlong/dataproxy/utils/ProxyUtils.java  |   2 +-
 .../dataproxy/utils/ServiceDiscoveryUtils.java     |   2 +-
 inlong-dataproxy/bin/prepare_env.sh                |   2 +-
 .../apache/inlong/dataproxy/base/ProxyMessage.java |  30 +--
 .../inlong/dataproxy/config/ConfigManager.java     |  32 ++--
 .../inlong/dataproxy/config/RemoteConfigJson.java  |   6 +-
 .../dataproxy/config/RemoteConfigManager.java      |   2 +-
 ...iesHolder.java => GroupIdPropertiesHolder.java} |  50 ++---
 .../config/remote/ConfigMessageServlet.java        |  16 +-
 .../dataproxy/consts/AttributeConstants.java       |  12 +-
 .../inlong/dataproxy/consts/ConfigConstants.java   |   2 +-
 .../org/apache/inlong/dataproxy/sink/MetaSink.java |  28 +--
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  10 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |   8 +-
 .../dataproxy/source/DefaultServiceDecoder.java    |  36 ++--
 .../dataproxy/source/ServerMessageHandler.java     |  92 ++++-----
 34 files changed, 586 insertions(+), 575 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
index 857d7f0..8811b14 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
@@ -23,20 +23,20 @@ public interface AttributeConstants {
     String KEY_VALUE_SEPARATOR = "=";
 
     /**
-     * business id
+     * group id
      * unique string id for each business or product
      */
-    String BUSINESS_ID = "bid";
+    String GROUP_ID = "groupId";
 
     /**
      * interface id
      * unique string id for each interface of business
      * An interface stand for a kind of data
      */
-    String INTERFACE_ID = "tid";
+    String INTERFACE_ID = "streamId";
 
     /**
-     * iname is like a tid but used in file protocol(m=xxx)
+     * iname is like a streamId but used in file protocol(m=xxx)
      */
     String INAME = "iname";
 
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
index 4257854..506c4c6 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
@@ -38,8 +38,8 @@ public class TDMsg1 {
     private static final int BIN_MSG_SNAPPY_TYPE = 1;
 
     private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
-    private static final int BIN_MSG_BID_OFFSET = 5;
-    private static final int BIN_MSG_TID_OFFSET = 7;
+    private static final int BIN_MSG_GROUPID_OFFSET = 5;
+    private static final int BIN_MSG_STREAMID_OFFSET = 7;
     private static final int BIN_MSG_EXTFIELD_OFFSET = 9;
     private static final int BIN_MSG_COUNT_OFFSET = 15;
     private static final int BIN_MSG_DATATIME_OFFSET = 11;
@@ -96,7 +96,7 @@ public class TDMsg1 {
     private int datalen = 0;
     private int msgcnt = 0;
     private boolean compress;
-    private boolean isNumBid = false;
+    private boolean isNumGroupId = false;
     private boolean ischeck = true;
 
     private final Version version;
@@ -634,7 +634,7 @@ public class TDMsg1 {
             parsedBinInput = ByteBuffer.wrap(binMsg);
             this.createtime = getBinCreatetime(parsedBinInput);
             this.msgcnt = getBinMsgCnt(parsedBinInput);
-            this.isNumBid = getBinNumFlag(parsedBinInput);
+            this.isNumGroupId = getBinNumFlag(parsedBinInput);
         }
     }
 
@@ -736,8 +736,8 @@ public class TDMsg1 {
 
         int totalLen = parsedBinInput.getInt(BIN_MSG_TOTALLEN_OFFSET);
         final int msgtype = parsedBinInput.get(BIN_MSG_MSGTYPE_OFFSET);
-        int bidNum = parsedBinInput.getShort(BIN_MSG_BID_OFFSET);
-        int tidNum = parsedBinInput.getShort(BIN_MSG_TID_OFFSET);
+        int groupIdNum = parsedBinInput.getShort(BIN_MSG_GROUPID_OFFSET);
+        int streamIdNum = parsedBinInput.getShort(BIN_MSG_STREAMID_OFFSET);
         int bodyLen = parsedBinInput.getInt(BIN_MSG_BODYLEN_OFFSET);
         long dataTime = parsedBinInput.getInt(BIN_MSG_DATATIME_OFFSET);
         final int extField = parsedBinInput.getShort(BIN_MSG_EXTFIELD_OFFSET);
@@ -783,11 +783,11 @@ public class TDMsg1 {
                 break;
         }
 
-        //number bid/tid
-        boolean isUseNumBid = ((extField & 0x4) == 0x0);
-        if (isUseNumBid) {
-            commonAttrMap.put(AttributeConstants.BUSINESS_ID, String.valueOf(bidNum));
-            commonAttrMap.put(AttributeConstants.INTERFACE_ID, String.valueOf(tidNum));
+        //number groupId/streamId
+        boolean isUseNumGroupId = ((extField & 0x4) == 0x0);
+        if (isUseNumGroupId) {
+            commonAttrMap.put(AttributeConstants.GROUP_ID, String.valueOf(groupIdNum));
+            commonAttrMap.put(AttributeConstants.INTERFACE_ID, String.valueOf(streamIdNum));
         }
 
         boolean hasOtherAttr = ((extField & 0x1) == 0x1);
@@ -1099,8 +1099,8 @@ public class TDMsg1 {
         return attrcnt;
     }
 
-    public boolean isNumBid() {
+    public boolean isNumGroupId() {
         checkMode(false);
-        return isNumBid;
+        return isNumGroupId;
     }
 }
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
index f91635d..4254032 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
@@ -42,10 +42,10 @@ public class DefaultMessageSender implements MessageSender {
     private static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
     private final Sender sender;
     private final SequentialID idGenerator;
-    private String bid;
+    private String groupId;
     private int msgtype = ConfigConstants.MSG_TYPE;
     private boolean isCompress = true;
-    private boolean isBidTransfer = false;
+    private boolean isGroupIdTransfer = false;
     private boolean isReport = false;
     private boolean isSupportLF = false;
     private int cpsSize = ConfigConstants.COMPRESS_SIZE;
@@ -54,7 +54,7 @@ public class DefaultMessageSender implements MessageSender {
             new ConcurrentHashMap<>();
 
     private final IndexCollectThread indexCol;
-    /* Store index <bid_tid,cnt>*/
+    /* Store index <groupId_streamId,cnt>*/
     private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
 
     private static final AtomicBoolean ManagerFetcherThreadStarted = new AtomicBoolean(false);
@@ -69,12 +69,12 @@ public class DefaultMessageSender implements MessageSender {
         isSupportLF = supportLF;
     }
 
-    public boolean isBidTransfer() {
-        return isBidTransfer;
+    public boolean isGroupIdTransfer() {
+        return isGroupIdTransfer;
     }
 
-    public void setBidTransfer(boolean isBidTransfer) {
-        this.isBidTransfer = isBidTransfer;
+    public void setGroupIdTransfer(boolean isGroupIdTransfer) {
+        this.isGroupIdTransfer = isGroupIdTransfer;
     }
 
     public boolean isReport() {
@@ -109,12 +109,12 @@ public class DefaultMessageSender implements MessageSender {
         this.isCompress = isCompress;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
     public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
@@ -125,7 +125,7 @@ public class DefaultMessageSender implements MessageSender {
         ProxyUtils.validClientConfig(configure);
         sender = new Sender(configure, selfDefineFactory);
         idGenerator = new SequentialID(Utils.getLocalIp());
-        bid = configure.getBid();
+        groupId = configure.getGroupId();
         indexCol = new IndexCollectThread(storeIndex);
         indexCol.start();
 
@@ -161,8 +161,8 @@ public class DefaultMessageSender implements MessageSender {
                                                                  ChannelFactory selfDefineFactory) throws Exception {
         ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
                 Utils.getLocalIp(), null);
-        proxyConfigManager.setBusinessId(configure.getBid());
-        ProxyConfigEntry entry = proxyConfigManager.getBidConfigure();
+        proxyConfigManager.setGroupId(configure.getGroupId());
+        ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
         DefaultMessageSender sender = cacheSender.get(entry.getClusterId());
         if (sender != null) {
             return sender;
@@ -195,43 +195,44 @@ public class DefaultMessageSender implements MessageSender {
                 idGenerator.getNextId()), msgUUID, timeout, timeUnit);
     }
 
-    public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
-        addIndexCnt(bid, tid, 1);
+        addIndexCnt(groupId, streamId, 1);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
-                    isBidTransfer, dt / 1000, idGenerator.getNextInt(), bid, tid, "");
+                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, "");
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
-                return sender.syncSendMessage(new EncodeObject(body, "bid=" + bid
-                        + "&tid=" + tid + "&dt=" + dt + "&cp=snappy",
-                        idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId
+                        + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body, "bid=" + bid + "&tid=" + tid + "&dt=" + dt,
-                        idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId
+                        + "&streamId=" + streamId + "&dt=" + dt,
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
 
         return null;
     }
 
-    public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
-        addIndexCnt(bid, tid, 1);
+        addIndexCnt(groupId, streamId, 1);
 
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
@@ -239,79 +240,79 @@ public class DefaultMessageSender implements MessageSender {
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
-                    isBidTransfer, dt / 1000,
-                    idGenerator.getNextInt(), bid, tid, attrs.toString());
+                    isGroupIdTransfer, dt / 1000,
+                    idGenerator.getNextInt(), groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
-            attrs.append("&bid=").append(bid).append("&tid=").append(tid).append("&dt=").append(dt);
+            attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
                 return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
         return null;
 
     }
 
-    public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
-                    isBidTransfer, dt / 1000,
-                    idGenerator.getNextInt(), bid, tid, "");
+                    isGroupIdTransfer, dt / 1000,
+                    idGenerator.getNextInt(), groupId, streamId, "");
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompress) {
-                return sender.syncSendMessage(new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+                return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
                         + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+                return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
                         + "&dt=" + dt + "&cnt=" + bodyList.size(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
         return null;
     }
 
-    public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
             || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
-                    isBidTransfer, dt / 1000,
-                    idGenerator.getNextInt(), bid, tid, attrs.toString());
+                    isGroupIdTransfer, dt / 1000,
+                    idGenerator.getNextInt(), groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
-            attrs.append("&bid=").append(bid).append("&tid=").append(tid)
+            attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
                     .append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
             if (isCompress) {
                 attrs.append("&cp=snappy");
                 return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
         return null;
@@ -325,101 +326,104 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncSendMessage(SendMessageCallback callback, byte[] body,
-                                 String bid, String tid, long dt, String msgUUID,
+                                 String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
-        addIndexCnt(bid, tid, 1);
+        addIndexCnt(groupId, streamId, 1);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport,
-                    isBidTransfer, dt / 1000, idGenerator.getNextInt(),
-                    bid, tid, "");
+                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+                    groupId, streamId, "");
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
-                sender.asyncSendMessage(new EncodeObject(body, "bid="
-                        + bid + "&tid=" + tid + "&dt=" + dt + "&cp=snappy",
-                        idGenerator.getNextId(), this.getMsgtype(), true, bid), callback, msgUUID, timeout, timeUnit);
+                sender.asyncSendMessage(new EncodeObject(body, "groupId="
+                        + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId),
+                        callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(body, "bid=" + bid + "&tid=" + tid + "&dt=" + dt, idGenerator.getNextId(),
-                                this.getMsgtype(), false, bid), callback, msgUUID, timeout, timeUnit);
+                        new EncodeObject(body, "groupId=" + groupId + "&streamId="
+                                + streamId + "&dt=" + dt, idGenerator.getNextId(),
+                                this.getMsgtype(), false, groupId), callback,
+                        msgUUID, timeout, timeUnit);
             }
         }
 
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 byte[] body, String bid, String tid, long dt, String msgUUID,
+                                 byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit,
                                  Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
-        addIndexCnt(bid, tid, 1);
+        addIndexCnt(groupId, streamId, 1);
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd,
-                    isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
-                    bid, tid, attrs.toString());
+                    isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+                    groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
-            attrs.append("&bid=").append(bid).append("&tid=").append(tid).append("&dt=").append(dt);
+            attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
                 sender.asyncSendMessage(new EncodeObject(body, attrs.toString(),
-                                idGenerator.getNextId(), this.getMsgtype(), true, bid),
+                                idGenerator.getNextId(), this.getMsgtype(), true, groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(),
-                                this.getMsgtype(), false, bid),
+                                this.getMsgtype(), false, groupId),
                         callback, msgUUID, timeout, timeUnit);
             }
         }
     }
 
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
-                                 String bid, String tid, long dt, String msgUUID,
+                                 String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress,
-                    isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
-                    bid, tid, "");
+                    isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+                    groupId, streamId, "");
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompress) {
                 sender.asyncSendMessage(
-                        new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+                        new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
                                 + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(),
                                 idGenerator.getNextId(), this.getMsgtype(),
-                                true, bid), callback, msgUUID, timeout, timeUnit);
+                                true, groupId), callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(bodyList, "bid=" + bid + "&tid="
-                                + tid + "&dt=" + dt + "&cnt=" + bodyList.size(),
+                        new EncodeObject(bodyList, "groupId=" + groupId + "&streamId="
+                                + streamId + "&dt=" + dt + "&cnt=" + bodyList.size(),
                                 idGenerator.getNextId(), this.getMsgtype(),
-                                false, bid), callback, msgUUID, timeout, timeUnit);
+                                false, groupId), callback, msgUUID, timeout, timeUnit);
             }
         }
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit,
                                  Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
@@ -427,34 +431,34 @@ public class DefaultMessageSender implements MessageSender {
             || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
-//            if (!isBidTransfer)
+//            if (!isGroupIdTransfer)
             EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(),
-                    isCompress, isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
-                    bid, tid, attrs.toString());
+                    isCompress, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+                    groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
-            attrs.append("&bid=").append(bid).append("&tid=").append(tid)
+            attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
                     .append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
             if (isCompress) {
                 attrs.append("&cp=snappy");
                 sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
-                        this.getMsgtype(), true, bid), callback, msgUUID, timeout, timeUnit);
+                        this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
-                        this.getMsgtype(), false, bid), callback, msgUUID, timeout, timeUnit);
+                        this.getMsgtype(), false, groupId), callback, msgUUID, timeout, timeUnit);
             }
         }
 
     }
 
-    private void addIndexCnt(String bid, String tid, long cnt) {
+    private void addIndexCnt(String groupId, String streamId, long cnt) {
         try {
-            String key = bid + "|" + tid;
+            String key = groupId + "|" + streamId;
             if (storeIndex.containsKey(key)) {
                 long sum = storeIndex.get(key);
                 storeIndex.put(key, sum + cnt);
@@ -466,8 +470,8 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String bid,
-                                     String tid, long dt, int sid, boolean isSupportLF, String msgUUID,
+    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
+                                     String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
                                      long timeout, TimeUnit timeUnit,
                                      Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
@@ -475,21 +479,21 @@ public class DefaultMessageSender implements MessageSender {
             || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
 
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
-                    isCompress, isReport, isBidTransfer,
-                    dt / 1000, sid, bid, tid, attrs.toString(), "data", "");
+                    isCompress, isReport, isGroupIdTransfer,
+                    dt / 1000, sid, groupId, streamId, attrs.toString(), "data", "");
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, timeout, timeUnit);
         }
     }
 
-    private void asyncSendMetric(FileCallback callback, byte[] body, String bid,
-                                 String tid, long dt, int sid, String ip, String msgUUID,
+    private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
+                                 String streamId, long dt, int sid, String ip, String msgUUID,
                                  long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
@@ -498,46 +502,46 @@ public class DefaultMessageSender implements MessageSender {
         boolean isCompressEnd = false;
         if (msgtype == 7 || msgtype == 8) {
             sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, isCompressEnd,
-                    isReport, isBidTransfer, dt / 1000,
-                    sid, bid, tid, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
+                    isReport, isGroupIdTransfer, dt / 1000,
+                    sid, groupId, streamId, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
         }
     }
 
-    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String bid, String tid,
+    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
                                     long dt, int sid, String ip, String msgUUID,
                                     long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        asyncSendMetric(callback, body, bid, tid, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
+        asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
-    public void asyncsendMessageFile(FileCallback callback, byte[] body, String bid,
-                                     String tid, long dt, int sid, String msgUUID,
+    public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
+                                     String streamId, long dt, int sid, String msgUUID,
                                      long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        asyncSendMetric(callback, body, bid, tid, dt, sid, "", msgUUID, timeout, timeUnit, "file");
+        asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
-    public String sendMessageData(List<byte[]> bodyList, String bid,
-                                  String tid, long dt, int sid, boolean isSupportLF, String msgUUID,
+    public String sendMessageData(List<byte[]> bodyList, String groupId,
+                                  String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
                                   long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
             || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
         }
-        addIndexCnt(bid, tid, bodyList.size());
+        addIndexCnt(groupId, streamId, bodyList.size());
 
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress,
-                    isReport, isBidTransfer, dt / 1000,
-                    sid, bid, tid, attrs.toString(), "data", "");
+                    isReport, isGroupIdTransfer, dt / 1000,
+                    sid, groupId, streamId, attrs.toString(), "data", "");
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
         }
         return null;
     }
 
-    private String sendMetric(byte[] body, String bid, String tid, long dt, int sid, String ip, String msgUUID,
+    private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
                               long timeout, TimeUnit timeUnit, String messageKey) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
@@ -545,20 +549,21 @@ public class DefaultMessageSender implements MessageSender {
         }
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
-                    isBidTransfer, dt / 1000, sid, bid, tid, "", messageKey, ip);
+                    isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
             return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
         }
         return null;
     }
 
-    public String sendMessageProxy(byte[] body, String bid, String tid, long dt, int sid, String ip, String msgUUID,
+    public String sendMessageProxy(byte[] body, String groupId, String streamId,
+                                   long dt, int sid, String ip, String msgUUID,
                                  long timeout, TimeUnit timeUnit) {
-        return sendMetric(body, bid, tid, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
+        return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
-    public String sendMessageFile(byte[] body, String bid, String tid, long dt, int sid, String msgUUID,
+    public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
                                   long timeout, TimeUnit timeUnit) {
-        return sendMetric(body, bid, tid, dt, sid, "", msgUUID, timeout, timeUnit, "file");
+        return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
     private void shutdownInternalThreads() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
index fc79f66..ca17b54 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
@@ -28,7 +28,7 @@ public interface MessageSender {
 
     /**
      * This method provides a synchronized function which you want to send data
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      * This method is deprecated,we suggest you don't use it.
      *
      * @param body       The data will be sent
@@ -44,19 +44,19 @@ public interface MessageSender {
      * @param body The data will be sent
      *             
      */
-    public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit);
 
     /**
      * This method provides a synchronized  function which you want to send data without packing
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      *
      * @param body         The data will be sent
      *                     
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair like <attrKey,attrValue>
      */
-    public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
 
     /**
@@ -65,24 +65,24 @@ public interface MessageSender {
      *
      * @param bodyList The data will be sent,which is a collection consisting of byte arrays
      */
-    public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit);
 
     /**
      * This method provides a synchronized  function which you want to send data with packing
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      * 
      *
      * @param bodyList     The data will be sent,which is a collection consisting of byte arrays
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair like <attrKey,attrValue>
      */
-    public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                   long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
 
     /**
      * This method provides an asynchronized  function which you want to send data
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      * This method is deprecated,we suggest you don't use it.
      * 
      *
@@ -96,7 +96,7 @@ public interface MessageSender {
 
     /**
      * This method provides a synchronized  function which you want to send data without packing
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      * 
      *
      * @param body         The data will be sent
@@ -104,7 +104,7 @@ public interface MessageSender {
      *                     and each element of extraAttrMap contains a pair like <attrKey,attrValue>
      */
     public void asyncSendMessage(SendMessageCallback callback,
-                                 byte[] body, String bid, String tid, long dt, String msgUUID,
+                                 byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit,
                                  Map<String, String> extraAttrMap) throws ProxysdkException;
 
@@ -116,7 +116,7 @@ public interface MessageSender {
      * @param body     The data will be sent
      */
     public void asyncSendMessage(SendMessageCallback callback,
-                                 byte[] body, String bid, String tid, long dt, String msgUUID,
+                                 byte[] body, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit) throws ProxysdkException;
 
     /**
@@ -126,12 +126,12 @@ public interface MessageSender {
      * @param bodyList The data will be sent,which is a collection consisting of byte arrays
      */
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit) throws ProxysdkException;
 
     /**
      * This method provides an asynchronized  function which you want to send data with packing
-     * with extra attributes except  bid,tid,dt,etc
+     * with extra attributes except  groupId,streamId,dt,etc
      * 
      *
      * @param bodyList     The data will be sent,which is a collection consisting of byte arrays
@@ -139,7 +139,7 @@ public interface MessageSender {
      *                     element of extraAttrMap contains a pair like <attrKey,attrValue>
      */
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
                                  long timeout, TimeUnit timeUnit,
                                  Map<String, String> extraAttrMap) throws ProxysdkException;
 
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
index 5e437ee..43e9d40 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
@@ -37,7 +37,7 @@ public class ProxyClientConfig {
     private int proxyUpdateIntervalMinutes;
     private int proxyUpdateMaxRetry;
     private String netTag;
-    private String bid;
+    private String groupId;
     private boolean isFile = false;
     private boolean isLocalVisit = true;
     private boolean isNeedDataEncry = false;
@@ -77,10 +77,10 @@ public class ProxyClientConfig {
     private boolean cleanHttpCacheWhenClosing = false;
 
     // config for metric collector
-    // whether use bid as key for metric, default is true
-    private boolean useBidAsKey = true;
-    // whether use tid as key for metric, default is true
-    private boolean useTidAsKey = true;
+    // whether use groupId as key for metric, default is true
+    private boolean useGroupIdAsKey = true;
+    // whether use StreamId as key for metric, default is true
+    private boolean useStreamIdAsKey = true;
     // whether use localIp as key for metric, default is true
     private boolean useLocalIpAsKey = true;
     // metric collection interval, default is 1 mins in milliseconds.
@@ -88,12 +88,12 @@ public class ProxyClientConfig {
     // max cache time for proxy config.
     private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
 
-    // metric bid
-    private String metricBid = "inlong_sla_metric";
+    // metric groupId
+    private String metricGroupId = "inlong_sla_metric";
 
     /*pay attention to the last url parameter ip*/
     public ProxyClientConfig(String localHost, boolean isLocalVisit, String managerIp,
-                           int managerPort, String bid, String netTag) throws ProxysdkException {
+                           int managerPort, String groupId, String netTag) throws ProxysdkException {
         if (Utils.isBlank(localHost)) {
             throw new ProxysdkException("localHost is blank!");
         }
@@ -101,7 +101,7 @@ public class ProxyClientConfig {
             throw new IllegalArgumentException("managerIp is Blank!");
         }
         this.proxyIPServiceURL = "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
-        this.bid = bid;
+        this.groupId = groupId;
         this.netTag = netTag;
         this.isLocalVisit = isLocalVisit;
         this.managerPort = managerPort;
@@ -137,12 +137,12 @@ public class ProxyClientConfig {
         isFile = file;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
     public int getManagerPort() {
@@ -369,20 +369,20 @@ public class ProxyClientConfig {
         this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
     }
 
-    public boolean isUseBidAsKey() {
-        return useBidAsKey;
+    public boolean isUseGroupIdAsKey() {
+        return useGroupIdAsKey;
     }
 
-    public void setUseBidAsKey(boolean useBidAsKey) {
-        this.useBidAsKey = useBidAsKey;
+    public void setUseGroupIdAsKey(boolean useGroupIdAsKey) {
+        this.useGroupIdAsKey = useGroupIdAsKey;
     }
 
-    public boolean isUseTidAsKey() {
-        return useTidAsKey;
+    public boolean isUseStreamIdAsKey() {
+        return useStreamIdAsKey;
     }
 
-    public void setUseTidAsKey(boolean useTidAsKey) {
-        this.useTidAsKey = useTidAsKey;
+    public void setUseStreamIdAsKey(boolean useStreamIdAsKey) {
+        this.useStreamIdAsKey = useStreamIdAsKey;
     }
 
     public boolean isUseLocalIpAsKey() {
@@ -401,12 +401,12 @@ public class ProxyClientConfig {
         this.metricIntervalInMs = metricIntervalInMs;
     }
 
-    public String getMetricBid() {
-        return metricBid;
+    public String getMetricGroupId() {
+        return metricGroupId;
     }
 
-    public void setMetricBid(String metricBid) {
-        this.metricBid = metricBid;
+    public void setMetricGroupId(String metricGroupId) {
+        this.metricGroupId = metricGroupId;
     }
 
     public long getMaxProxyCacheTimeInMs() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
index e967965..bdd5e78 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
@@ -39,15 +39,15 @@ public class EncodeObject {
     private long packageTime = System.currentTimeMillis();
     private int cnt = -1;
     private boolean isReport = false;
-    private boolean isBidTransfer = false;
+    private boolean isGroupIdTransfer = false;
     private boolean isSupportLF = false;
     private boolean isAuth = false;
     private boolean isEncrypt = false;
     private boolean isCompress = true;
-    private int bidNum;
-    private int tidNum;
-    private String bid;
-    private String tid;
+    private int groupIdNum;
+    private int streamIdNum;
+    private String groupId;
+    private String streamId;
     private short load;
     private String userName = "";
     private String secretKey = "";
@@ -85,89 +85,92 @@ public class EncodeObject {
 
     // used for bytes initializtion,msgtype=3/5
     public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String bid) {
+                        int msgtype, boolean isCompress, final String groupId) {
         this.bodyBytes = bodyBytes;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
         this.msgtype = msgtype;
-        this.bid = bid;
+        this.groupId = groupId;
         this.isCompress = isCompress;
     }
 
     // used for bodylist initializtion,msgtype=3/5
     public EncodeObject(List<byte[]> bodyList, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String bid) {
+                        int msgtype, boolean isCompress, final String groupId) {
         this.bodylist = bodyList;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
         this.msgtype = msgtype;
-        this.bid = bid;
+        this.groupId = groupId;
         this.isCompress = isCompress;
     }
 
     // used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport,
-                        boolean isBidTransfer, long dt, long seqId, String bid, String tid, String commonattr) {
+                        boolean isGroupIdTransfer, long dt, long seqId, String groupId,
+                        String streamId, String commonattr) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
         this.isReport = isReport;
         this.dt = dt;
-        this.isBidTransfer = isBidTransfer;
+        this.isGroupIdTransfer = isGroupIdTransfer;
         this.commonattr = commonattr;
         this.messageId = String.valueOf(seqId);
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
     }
 
     // used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isBidTransfer, long dt,
-                        long seqId, String bid, String tid, String commonattr) {
+                        boolean isReport, boolean isGroupIdTransfer, long dt,
+                        long seqId, String groupId, String streamId, String commonattr) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
         this.isReport = isReport;
         this.dt = dt;
-        this.isBidTransfer = isBidTransfer;
+        this.isGroupIdTransfer = isGroupIdTransfer;
         this.commonattr = commonattr;
         this.messageId = String.valueOf(seqId);
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
     }
 
     // file agent, used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isBidTransfer, long dt,
-                        long seqId, String bid, String tid, String commonattr, String messageKey, String proxyIp) {
+                        boolean isReport, boolean isGroupIdTransfer, long dt,
+                        long seqId, String groupId, String streamId, String commonattr,
+                        String messageKey, String proxyIp) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
         this.isReport = isReport;
         this.dt = dt;
-        this.isBidTransfer = isBidTransfer;
+        this.isGroupIdTransfer = isGroupIdTransfer;
         this.commonattr = commonattr;
         this.messageId = String.valueOf(seqId);
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.messageKey = messageKey;
         this.proxyIp = proxyIp;
     }
 
     // file agent, used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isBidTransfer, long dt,
-                        long seqId, String bid, String tid, String commonattr, String messageKey, String proxyIp) {
+                        boolean isReport, boolean isGroupIdTransfer, long dt,
+                        long seqId, String groupId, String streamId, String commonattr,
+                        String messageKey, String proxyIp) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
         this.isReport = isReport;
         this.dt = dt;
-        this.isBidTransfer = isBidTransfer;
+        this.isGroupIdTransfer = isGroupIdTransfer;
         this.commonattr = commonattr;
         this.messageId = String.valueOf(seqId);
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.messageKey = messageKey;
         this.proxyIp = proxyIp;
     }
@@ -180,12 +183,12 @@ public class EncodeObject {
         this.msgUUID = msgUUID;
     }
 
-    public boolean isBidTransfer() {
-        return isBidTransfer;
+    public boolean isGroupIdTransfer() {
+        return isGroupIdTransfer;
     }
 
-    public void setBidTransfer(boolean isBidTransfer) {
-        this.isBidTransfer = isBidTransfer;
+    public void setGroupIdTransfer(boolean isGroupIdTransfer) {
+        this.isGroupIdTransfer = isGroupIdTransfer;
     }
 
     public short getLoad() {
@@ -196,20 +199,20 @@ public class EncodeObject {
         this.load = load;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
-    public void setTid(String tid) {
-        this.tid = tid;
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
     }
 
     public void setMsgtype(int msgtype) {
@@ -262,20 +265,20 @@ public class EncodeObject {
         this.encryptEntry = encryptEntry;
     }
 
-    public int getBidNum() {
-        return bidNum;
+    public int getGroupIdNum() {
+        return groupIdNum;
     }
 
-    public void setBidNum(int bidNum) {
-        this.bidNum = bidNum;
+    public void setGroupIdNum(int groupIdNum) {
+        this.groupIdNum = groupIdNum;
     }
 
-    public int getTidNum() {
-        return tidNum;
+    public int getStreamIdNum() {
+        return streamIdNum;
     }
 
-    public void setTidNum(int tidNum) {
-        this.tidNum = tidNum;
+    public void setStreamIdNum(int streamIdNum) {
+        this.streamIdNum = streamIdNum;
     }
 
     public long getDt() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
index 7edd389..456156b 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
@@ -136,11 +136,11 @@ public class ProtocolEncoder extends OneToOneEncoder {
                     body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
                 }
             }
-            if (!object.isBidTransfer()) {
+            if (!object.isGroupIdTransfer()) {
                 if (Utils.isNotBlank(endAttr)) {
                     endAttr = endAttr + "&";
                 }
-                endAttr = (endAttr + "bid=" + object.getBid() + "&tid=" + object.getTid());
+                endAttr = (endAttr + "groupId=" + object.getGroupId() + "&streamId=" + object.getStreamId());
             }
             if (Utils.isNotBlank(object.getMsgUUID())) {
                 if (Utils.isNotBlank(endAttr)) {
@@ -159,12 +159,12 @@ public class ProtocolEncoder extends OneToOneEncoder {
             totalLength = totalLength + body.length + endAttr.getBytes("utf8").length;
             buf.writeInt(totalLength);
             buf.writeByte(msgType);
-            buf.writeShort(object.getBidNum());
-            buf.writeShort(object.getTidNum());
+            buf.writeShort(object.getGroupIdNum());
+            buf.writeShort(object.getStreamIdNum());
             String bitStr = object.isSupportLF() ? "1" : "0";
             bitStr += (object.getMessageKey().equals("minute")) ? "1" : "0";
             bitStr += (object.getMessageKey().equals("file")) ? "1" : "0";
-            bitStr += !object.isBidTransfer() ? "1" : "0";
+            bitStr += !object.isGroupIdTransfer() ? "1" : "0";
             bitStr += object.isReport() ? "1" : "0";
             bitStr += "0";
             buf.writeShort(Integer.parseInt(bitStr, 2));
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
index 82b68a1..9f5b8d4 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
@@ -138,7 +138,7 @@ public class EncryptConfigEntry implements java.io.Serializable {
     }
 
     public String toString() {
-        return "{\"version\":\"" + version + "\",\"public_key\":\"" + pubKey + "\",\"bid\":\"" + userName + "\"}";
+        return "{\"version\":\"" + version + "\",\"public_key\":\"" + pubKey + "\",\"groupId\":\"" + userName + "\"}";
     }
 
 }
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
index 2a994fd..846b057 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
@@ -22,11 +22,11 @@ import java.util.Map;
 
 public class ProxyConfigEntry implements java.io.Serializable {
     private String clusterId;
-    private String bid;
+    private String groupId;
     private int size;
     private Map<String, HostInfo> hostMap;
-    private int bidNum;
-    private Map<String, Integer> tidNumMap;
+    private int groupIdNum;
+    private Map<String, Integer> streamIdNumMap;
     private int load;
     private int switchStat;
     private boolean isInterVisit;
@@ -39,17 +39,17 @@ public class ProxyConfigEntry implements java.io.Serializable {
         this.load = load;
     }
 
-    public int getBidNum() {
-        return bidNum;
+    public int getGroupIdNum() {
+        return groupIdNum;
     }
 
-    public Map<String, Integer> getTidNumMap() {
-        return tidNumMap;
+    public Map<String, Integer> getStreamIdNumMap() {
+        return streamIdNumMap;
     }
 
-    public void setBidNumAndTidNumMap(int bidNum, Map<String, Integer> tidNumMap) {
-        this.bidNum = bidNum;
-        this.tidNumMap = tidNumMap;
+    public void setGroupIdNumAndStreamIdNumMap(int groupIdNum, Map<String, Integer> streamIdNumMap) {
+        this.groupIdNum = groupIdNum;
+        this.streamIdNumMap = streamIdNumMap;
     }
 
     public int getSwitchStat() {
@@ -73,12 +73,12 @@ public class ProxyConfigEntry implements java.io.Serializable {
         return size;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
     public boolean isInterVisit() {
@@ -91,8 +91,9 @@ public class ProxyConfigEntry implements java.io.Serializable {
 
     @Override
     public String toString() {
-        return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn=" + bidNum + ", tsnMap=" + tidNumMap
-                + ", size=" + size + ", isInterVisit=" + isInterVisit + ", bid=" + bid
+        return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn="
+                + groupIdNum + ", tsnMap=" + streamIdNumMap
+                + ", size=" + size + ", isInterVisit=" + isInterVisit + ", groupId=" + groupId
                 + ", switch=" + switchStat + "]";
     }
 
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
index e61b2e2..f846117 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
@@ -82,7 +82,7 @@ public class ProxyConfigManager extends Thread {
     private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
     /*the status of the cluster.if this value is changed,we need rechoose  three proxy*/
     private int oldStat = 0;
-    private String businessId;
+    private String groupId;
     private final ProxyClientConfig clientConfig;
     private final String localIP;
     private String localMd5;
@@ -101,12 +101,12 @@ public class ProxyConfigManager extends Thread {
         this.clientManager = clientManager;
     }
 
-    public String getBusinessId() {
-        return businessId;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBusinessId(String businessId) {
-        this.businessId = businessId;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
     public void shutDown() {
@@ -158,7 +158,7 @@ public class ProxyConfigManager extends Thread {
             if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
                 JsonReader reader = new JsonReader(new FileReader(configCachePath));
                 ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class);
-                logger.info("{} has a backup! {}", businessId, proxyConfigEntry);
+                logger.info("{} has a backup! {}", groupId, proxyConfigEntry);
                 return proxyConfigEntry;
             }
         } catch (Exception ex) {
@@ -199,14 +199,14 @@ public class ProxyConfigManager extends Thread {
     }
 
     /**
-     *  get bid config
+     *  get groupId config
      *
      * @return proxyConfigEntry
      * @throws Exception
      */
-    public ProxyConfigEntry getBidConfigure() throws Exception {
+    public ProxyConfigEntry getGroupIdConfigure() throws Exception {
         ProxyConfigEntry proxyEntry;
-        String configAddr = clientConfig.getConfStoreBasePath() + businessId;
+        String configAddr = clientConfig.getConfStoreBasePath() + groupId;
         if (this.clientConfig.isReadProxyIPFromLocal()) {
             configAddr = configAddr + ".local";
             proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -242,7 +242,7 @@ public class ProxyConfigManager extends Thread {
             localMd5 = calcHostInfoMd5(proxyInfoList);
         }
         ProxyConfigEntry proxyEntry = null;
-        String configAddr = clientConfig.getConfStoreBasePath() + businessId;
+        String configAddr = clientConfig.getConfStoreBasePath() + groupId;
         if (clientConfig.isReadProxyIPFromLocal()) {
             configAddr = configAddr + ".local";
             proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -298,8 +298,8 @@ public class ProxyConfigManager extends Thread {
             if (proxyEntry.getSize() != 0) {
                 /* Initialize the current proxy information list first. */
                 clientManager.setLoadThreshold(proxyEntry.getLoad());
-                clientManager.setBidNum(proxyEntry.getBidNum());
-                clientManager.setTidMap(proxyEntry.getTidNumMap());
+                clientManager.setGroupIdNum(proxyEntry.getGroupIdNum());
+                clientManager.setStreamIdMap(proxyEntry.getStreamIdNumMap());
 
                 List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
                 for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) {
@@ -559,9 +559,9 @@ public class ProxyConfigManager extends Thread {
             throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
         }
 
-        int bidNum = 0;
+        int groupIdNum = 0;
         if (localProxyAddrJson.has("bsn")) {
-            bidNum = localProxyAddrJson.get("bsn").getAsInt();
+            groupIdNum = localProxyAddrJson.get("bsn").getAsInt();
         }
 
         int load = ConfigConstants.LOAD_THRESHOLD;
@@ -570,15 +570,15 @@ public class ProxyConfigManager extends Thread {
             load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
         }
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
-        proxyEntry.setBid(clientConfig.getBid());
+        proxyEntry.setGroupId(clientConfig.getGroupId());
         boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
         proxyEntry.setInterVisit(isInterVisit);
         Map<String, HostInfo> hostMap = getHostInfoMap(
             localProxyAddrJson);
         proxyEntry.setHostMap(hostMap);
         proxyEntry.setSwitchStat(0);
-        Map<String, Integer> tidMap = getTidMap(localProxyAddrJson);
-        proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
+        Map<String, Integer> streamIdMap = getStreamIdMap(localProxyAddrJson);
+        proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
         proxyEntry.setLoad(load);
         if (localProxyAddrJson.has("cluster_id")) {
             proxyEntry.setClusterId(localProxyAddrJson.get("cluster_id").getAsString());
@@ -624,18 +624,18 @@ public class ProxyConfigManager extends Thread {
         return hostMap;
     }
 
-    private Map<String, Integer> getTidMap(JsonObject localProxyAddrJson) {
-        Map<String, Integer> tidMap = new HashMap<String, Integer>();
+    private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) {
+        Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
         if (localProxyAddrJson.has("tsn")) {
-            JsonArray jsontid = localProxyAddrJson.getAsJsonArray("tsn");
-            for (int i = 0; i < jsontid.size(); i++) {
-                JsonObject jsonItem = jsontid.get(i).getAsJsonObject();
-                if (jsonItem != null && jsonItem.has("tid") && jsonItem.has("sn")) {
-                    tidMap.put(jsonItem.get("tid").getAsString(), jsonItem.get("sn").getAsInt());
+            JsonArray jsonStreamId = localProxyAddrJson.getAsJsonArray("tsn");
+            for (int i = 0; i < jsonStreamId.size(); i++) {
+                JsonObject jsonItem = jsonStreamId.get(i).getAsJsonObject();
+                if (jsonItem != null && jsonItem.has("streamId") && jsonItem.has("sn")) {
+                    streamIdMap.put(jsonItem.get("streamId").getAsString(), jsonItem.get("sn").getAsInt());
                 }
             }
         }
-        return tidMap;
+        return streamIdMap;
     }
 
     private boolean checkValidProxy(String filePath, JsonObject localProxyAddrJson) throws Exception {
@@ -673,9 +673,9 @@ public class ProxyConfigManager extends Thread {
         if (hostMap == null) {
             return null;
         }
-        int bidNum = 0;
+        int groupIdNum = 0;
         if (jsonRes.has("bsn")) {
-            bidNum = jsonRes.get("bsn").getAsInt();
+            groupIdNum = jsonRes.get("bsn").getAsInt();
         }
         int load = ConfigConstants.LOAD_THRESHOLD;
         if (jsonRes.has("load")) {
@@ -683,12 +683,12 @@ public class ProxyConfigManager extends Thread {
             load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
         }
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
-        proxyEntry.setBid(clientConfig.getBid());
+        proxyEntry.setGroupId(clientConfig.getGroupId());
         proxyEntry.setInterVisit(true);
         proxyEntry.setHostMap(hostMap);
         proxyEntry.setSwitchStat(0);
-        Map<String, Integer> tidMap = getTidMap(jsonRes);
-        proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
+        Map<String, Integer> streamIdMap = getStreamIdMap(jsonRes);
+        proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
         proxyEntry.setLoad(load);
         if (jsonRes.has("cluster_id")) {
             proxyEntry.setClusterId(jsonRes.get("cluster_id").getAsString());
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
index b1c22b5..4d6ceeb 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
@@ -22,26 +22,26 @@ import java.util.ArrayList;
 
 public class Event {
     private byte[] body;
-    private String bid;
-    private String tid;
+    private String groupId;
+    private String streamId;
     private long dt;
     private int tryTimes = 0;
     ArrayList<byte[]> bodylist = new ArrayList<byte[]>();
 
-    public Event(byte[] body, String bid, String tid, long dt) {
+    public Event(byte[] body, String groupId, String streamId, long dt) {
         super();
         this.body = body;
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.dt = dt;
         this.setTryTimes(0);
     }
 
-    public Event(ArrayList<byte[]> bodylist, String bid, String tid, long dt) {
+    public Event(ArrayList<byte[]> bodylist, String groupId, String streamId, long dt) {
         super();
         this.bodylist = bodylist;
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.dt = dt;
         this.setTryTimes(0);
     }
@@ -62,20 +62,20 @@ public class Event {
         this.body = body;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
-    public void setTid(String tid) {
-        this.tid = tid;
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
     }
 
     public long getDt() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
index 659a70b..e0b7968 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
@@ -87,16 +87,16 @@ public class InternalHttpSender {
      * construct header
      *
      * @param bodies
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @return
      */
     private ArrayList<BasicNameValuePair> getHeaders(List<String> bodies,
-                                                     String bid, String tid, long dt) {
+                                                     String groupId, String streamId, long dt) {
         ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
-        params.add(new BasicNameValuePair("bid", bid));
-        params.add(new BasicNameValuePair("tid", tid));
+        params.add(new BasicNameValuePair("groupId", groupId));
+        params.add(new BasicNameValuePair("streamId", streamId));
         params.add(new BasicNameValuePair("dt", String.valueOf(dt)));
         params.add(new BasicNameValuePair("body", StringUtils.join(bodies, "\n")));
         params.add(new BasicNameValuePair("cnt", String.valueOf(bodies.size())));
@@ -137,8 +137,8 @@ public class InternalHttpSender {
                         HttpMessage httpMessage = messageCache.poll();
                         if (httpMessage != null) {
                             SendResult result = sendMessageWithHostInfo(
-                                    httpMessage.getBodies(), httpMessage.getBid(),
-                                    httpMessage.getTid(), httpMessage.getDt(),
+                                    httpMessage.getBodies(), httpMessage.getGroupId(),
+                                    httpMessage.getStreamId(), httpMessage.getDt(),
                                     httpMessage.getTimeout(), httpMessage.getTimeUnit());
                             httpMessage.getCallback().onMessageAck(result);
                         }
@@ -168,8 +168,8 @@ public class InternalHttpSender {
      * send request by http
      *
      * @param bodies
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
@@ -177,7 +177,7 @@ public class InternalHttpSender {
      * @return
      * @throws Exception
      */
-    private SendResult sendByHttp(List<String> bodies, String bid, String tid, long dt,
+    private SendResult sendByHttp(List<String> bodies, String groupId, String streamId, long dt,
                                   long timeout, TimeUnit timeUnit, HostInfo hostInfo) throws Exception {
         HttpPost httpPost = null;
         CloseableHttpResponse response = null;
@@ -191,7 +191,7 @@ public class InternalHttpSender {
             httpPost = new HttpPost(url);
             httpPost.setHeader(HttpHeaders.CONNECTION, "close");
             httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-            ArrayList<BasicNameValuePair> contents = getHeaders(bodies, bid, tid, dt);
+            ArrayList<BasicNameValuePair> contents = getHeaders(bodies, groupId, streamId, dt);
             String s = URLEncodedUtils.format(contents, StandardCharsets.UTF_8);
             logger.info("encode string is {}", s);
             httpPost.setEntity(new StringEntity(s));
@@ -233,21 +233,21 @@ public class InternalHttpSender {
      * send message with host info
      *
      * @param bodies
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
      * @return
      */
-    public SendResult sendMessageWithHostInfo(List<String> bodies, String bid, String tid, long dt,
+    public SendResult sendMessageWithHostInfo(List<String> bodies, String groupId, String streamId, long dt,
                                               long timeout, TimeUnit timeUnit) {
 
         List<HostInfo> randomHostList = getRandomHostInfo();
         Exception tmpException = null;
         for (HostInfo hostInfo : randomHostList) {
             try {
-                return sendByHttp(bodies, bid, tid, dt, timeout, timeUnit, hostInfo);
+                return sendByHttp(bodies, groupId, streamId, dt, timeout, timeUnit, hostInfo);
             } catch (Exception exception) {
                 tmpException = exception;
                 logger.debug("error while sending data, resending it", exception);
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
index 8ac9153..fed67ed 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
@@ -27,14 +27,15 @@ public class MessageRecord {
     private final long startTime;
     private final long dt;
 
-    private final String bid;
-    private final String tid;
+    private final String groupId;
+    private final String streamId;
     private final String localIp;
     private final long packTime;
 
-    public MessageRecord(String bid, String tid, String localIp, String msgId, long dt, long packTime, int msgCount) {
-        this.bid = bid;
-        this.tid = tid;
+    public MessageRecord(String groupId, String streamId, String localIp,
+                         String msgId, long dt, long packTime, int msgCount) {
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.localIp = localIp;
         this.msgUUID = msgId;
         this.msgCount = msgCount;
@@ -63,12 +64,12 @@ public class MessageRecord {
         return dt;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
     public String getLocalIp() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
index 1b87169..aa32d0d 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
@@ -75,9 +75,9 @@ public class ClientMgr {
     private SendHBThread sendHBThread;
     private ProxyConfigManager ipManager;
 
-    private int bidNum = 0;
-    private String bid = "";
-    private Map<String, Integer> tidMap = new HashMap<String, Integer>();
+    private int groupIdNum = 0;
+    private String groupId = "";
+    private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
     private int loadThreshold;
     private int loadCycle = 0;
     private static final int[] weight = {
@@ -101,28 +101,28 @@ public class ClientMgr {
         this.loadThreshold = loadThreshold;
     }
 
-    public int getBidNum() {
-        return bidNum;
+    public int getGroupIdNum() {
+        return groupIdNum;
     }
 
-    public void setBidNum(int bidNum) {
-        this.bidNum = bidNum;
+    public void setGroupIdNum(int groupIdNum) {
+        this.groupIdNum = groupIdNum;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
     }
 
-    public Map<String, Integer> getTidMap() {
-        return tidMap;
+    public Map<String, Integer> getStreamIdMap() {
+        return streamIdMap;
     }
 
-    public void setTidMap(Map<String, Integer> tidMap) {
-        this.tidMap = tidMap;
+    public void setStreamIdMap(Map<String, Integer> streamIdMap) {
+        this.streamIdMap = streamIdMap;
     }
 
     public EncryptConfigEntry getEncryptConfigEntry() {
@@ -195,9 +195,9 @@ public class ClientMgr {
         /* ready to Start the thread which refreshes the proxy list. */
         ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(), this);
         ipManager.setName("proxyConfigManager");
-        if (configure.getBid() != null) {
-            ipManager.setBusinessId(configure.getBid());
-            bid = configure.getBid();
+        if (configure.getGroupId() != null) {
+            ipManager.setGroupId(configure.getGroupId());
+            groupId = configure.getGroupId();
         }
 
         /*
@@ -222,8 +222,8 @@ public class ClientMgr {
         this.sendHBThread.start();
     }
 
-    public ProxyConfigEntry getBidConfigureInfo() throws Exception {
-        return ipManager.getBidConfigure();
+    public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
+        return ipManager.getGroupIdConfigure();
     }
 
     private boolean initConnection(HostInfo host) {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
index 712b113..4402f79 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
@@ -27,18 +27,18 @@ import org.apache.inlong.dataproxy.SendMessageCallback;
  * http message for cache.
  */
 public class HttpMessage {
-    private final String bid;
-    private final String tid;
+    private final String groupId;
+    private final String streamId;
     private final List<String> bodies;
     private final SendMessageCallback callback;
     private final long dt;
     private final long timeout;
     private final TimeUnit timeUnit;
 
-    public HttpMessage(List<String> bodies, String bid, String tid, long dt,
+    public HttpMessage(List<String> bodies, String groupId, String streamId, long dt,
                        long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
-        this.bid = bid;
-        this.tid = tid;
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.bodies = bodies;
         this.callback = callback;
         this.dt = dt;
@@ -46,12 +46,12 @@ public class HttpMessage {
         this.timeUnit = timeUnit;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
     public List<String> getBodies() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
index 37ffd82..ce78e41 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
@@ -70,7 +70,7 @@ public class HttpProxySender extends Thread {
         try {
             proxyConfigManager = new ProxyConfigManager(configure,
                     Utils.getLocalIp(), null);
-            proxyConfigManager.setBusinessId(configure.getBid());
+            proxyConfigManager.setGroupId(configure.getGroupId());
             ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
             hostList.addAll(proxyConfigEntry.getHostMap().values());
 
@@ -92,7 +92,7 @@ public class HttpProxySender extends Thread {
      * @return proxy config entry.
      */
     private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
-        return proxyConfigManager.getBidConfigure();
+        return proxyConfigManager.getGroupIdConfigure();
     }
 
     /**
@@ -106,7 +106,7 @@ public class HttpProxySender extends Thread {
                 int randSleepTime = proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60 + rand;
                 TimeUnit.MILLISECONDS.sleep(randSleepTime * 1000);
                 if (proxyConfigManager != null) {
-                    ProxyConfigEntry proxyConfigEntry = proxyConfigManager.getBidConfigure();
+                    ProxyConfigEntry proxyConfigEntry = proxyConfigManager.getGroupIdConfigure();
                     hostList.addAll(proxyConfigEntry.getHostMap().values());
                     hostList.retainAll(proxyConfigEntry.getHostMap().values());
                 } else {
@@ -125,38 +125,38 @@ public class HttpProxySender extends Thread {
      * send by http
      *
      * @param body
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
      * @return
      */
-    public SendResult sendMessage(String body, String bid, String tid, long dt,
+    public SendResult sendMessage(String body, String groupId, String streamId, long dt,
                                   long timeout, TimeUnit timeUnit) {
-        return sendMessage(Collections.singletonList(body), bid, tid, dt, timeout, timeUnit);
+        return sendMessage(Collections.singletonList(body), groupId, streamId, dt, timeout, timeUnit);
     }
 
     /**
      * send multiple messages.
      *
      * @param bodies   list of bodies
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
      * @return
      */
-    public SendResult sendMessage(List<String> bodies, String bid, String tid, long dt,
+    public SendResult sendMessage(List<String> bodies, String groupId, String streamId, long dt,
                                   long timeout, TimeUnit timeUnit) {
         if (hostList.isEmpty()) {
             logger.error("proxy list is empty, maybe client has been "
-                    + "closed or bid is not assigned with proxy list");
+                    + "closed or groupId is not assigned with proxy list");
             return SendResult.NO_CONNECTION;
         }
         return internalHttpSender.sendMessageWithHostInfo(
-                bodies, bid, tid, dt, timeout, timeUnit);
+                bodies, groupId, streamId, dt, timeout, timeUnit);
 
     }
 
@@ -164,17 +164,17 @@ public class HttpProxySender extends Thread {
      * async sender
      *
      * @param bodies
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
      * @param callback
      */
-    public void asyncSendMessage(List<String> bodies, String bid, String tid, long dt,
+    public void asyncSendMessage(List<String> bodies, String groupId, String streamId, long dt,
                                  long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
         List<String> bodyList = new ArrayList<>(bodies);
-        HttpMessage httpMessage = new HttpMessage(bodyList, bid, tid, dt,
+        HttpMessage httpMessage = new HttpMessage(bodyList, groupId, streamId, dt,
                 timeout, timeUnit, callback);
         try {
             if (!messageCache.offer(httpMessage)) {
@@ -200,16 +200,16 @@ public class HttpProxySender extends Thread {
      * async send single message.
      *
      * @param body
-     * @param bid
-     * @param tid
+     * @param groupId
+     * @param streamId
      * @param dt
      * @param timeout
      * @param timeUnit
      * @param callback
      */
-    public void asyncSendMessage(String body, String bid, String tid, long dt,
+    public void asyncSendMessage(String body, String groupId, String streamId, long dt,
                                  long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
-        asyncSendMessage(Collections.singletonList(body), bid, tid,
+        asyncSendMessage(Collections.singletonList(body), groupId, streamId,
                 dt, timeout, timeUnit, callback);
     }
 
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
index f29630b..d4938df 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
@@ -77,7 +77,7 @@ public class Sender {
         this.clientMgr = new ClientMgr(configure, this, selfDefineFactory);
         ProxyConfigEntry proxyConfigEntry = null;
         try {
-            proxyConfigEntry = this.clientMgr.getBidConfigureInfo();
+            proxyConfigEntry = this.clientMgr.getGroupIdConfigureInfo();
             setClusterId(proxyConfigEntry.getClusterId());
         } catch (Throwable e) {
             if (configure.isReadProxyIPFromLocal()) {
@@ -186,17 +186,17 @@ public class Sender {
             return SendResult.INVALID_ATTRIBUTES;
         }
         if (encodeObject.getMsgtype() == 7) {
-            int bidnum = 0;
-            int tidnum = 0;
-            if (encodeObject.getBid().equals(clientMgr.getBid())) {
-                bidnum = clientMgr.getBidNum();
-                tidnum = clientMgr.getTidMap().get(encodeObject.getTid()) != null
-                    ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+            int groupIdnum = 0;
+            int streamIdnum = 0;
+            if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
+                groupIdnum = clientMgr.getGroupIdNum();
+                streamIdnum = clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
+                    ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
             }
-            encodeObject.setBidNum(bidnum);
-            encodeObject.setTidNum(tidnum);
-            if (bidnum == 0 || tidnum == 0) {
-                encodeObject.setBidTransfer(false);
+            encodeObject.setGroupIdNum(groupIdnum);
+            encodeObject.setStreamIdNum(streamIdnum);
+            if (groupIdnum == 0 || streamIdnum == 0) {
+                encodeObject.setGroupIdTransfer(false);
             }
         }
         if (this.configure.isNeedDataEncry()) {
@@ -216,7 +216,7 @@ public class Sender {
     public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID,
                                       long timeout, TimeUnit timeUnit) {
         metricWorker.recordNumByKey(encodeObject.getMessageId(),
-                encodeObject.getBid(), encodeObject.getTid(),
+                encodeObject.getGroupId(), encodeObject.getStreamId(),
                 Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt());
         NettyClient client = null;
         SendResult message = null;
@@ -277,17 +277,17 @@ public class Sender {
         }
 
         if (encodeObject.getMsgtype() == 7) {
-            int bidnum = 0;
-            int tidnum = 0;
-            if (encodeObject.getBid().equals(clientMgr.getBid())) {
-                bidnum = clientMgr.getBidNum();
-                tidnum = clientMgr.getTidMap().get(encodeObject.getTid()) != null
-                    ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+            int groupIdnum = 0;
+            int streamIdnum = 0;
+            if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
+                groupIdnum = clientMgr.getGroupIdNum();
+                streamIdnum = clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
+                    ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
             }
-            encodeObject.setBidNum(bidnum);
-            encodeObject.setTidNum(tidnum);
-            if (bidnum == 0 || tidnum == 0) {
-                encodeObject.setBidTransfer(false);
+            encodeObject.setGroupIdNum(groupIdnum);
+            encodeObject.setStreamIdNum(streamIdnum);
+            if (groupIdnum == 0 || streamIdnum == 0) {
+                encodeObject.setGroupIdTransfer(false);
             }
         }
         if (this.configure.isNeedDataEncry()) {
@@ -425,17 +425,17 @@ public class Sender {
         msgQueueMap.put(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(),
                 callback, size, timeout, timeUnit));
         if (encodeObject.getMsgtype() == 7) {
-            int bidnum = 0;
-            int tidnum = 0;
-            if ((clientMgr.getBid().length() != 0) && (encodeObject.getBid().equals(clientMgr.getBid()))) {
-                bidnum = clientMgr.getBidNum();
-                tidnum = (clientMgr.getTidMap().get(encodeObject.getTid()) != null)
-                        ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+            int groupIdnum = 0;
+            int streamIdnum = 0;
+            if ((clientMgr.getGroupId().length() != 0) && (encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
+                groupIdnum = clientMgr.getGroupIdNum();
+                streamIdnum = (clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null)
+                        ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
             }
-            encodeObject.setBidNum(bidnum);
-            encodeObject.setTidNum(tidnum);
-            if (bidnum == 0 || tidnum == 0) {
-                encodeObject.setBidTransfer(false);
+            encodeObject.setGroupIdNum(groupIdnum);
+            encodeObject.setStreamIdNum(streamIdnum);
+            if (groupIdnum == 0 || streamIdnum == 0) {
+                encodeObject.setGroupIdTransfer(false);
             }
         }
         if (this.configure.isNeedDataEncry()) {
@@ -495,8 +495,8 @@ public class Sender {
     public void asyncSendMessage(EncodeObject encodeObject,
                                  SendMessageCallback callback, String msgUUID,
                                  long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getBid(),
-                encodeObject.getTid(), Utils.getLocalIp(), encodeObject.getPackageTime(),
+        metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
+                encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
                 encodeObject.getDt(), encodeObject.getRealCnt());
 
         // send message package time
@@ -546,17 +546,17 @@ public class Sender {
             logger.warn("message id {} has existed.", encodeObject.getMessageId());
         }
         if (encodeObject.getMsgtype() == 7) {
-            int bidnum = 0;
-            int tidnum = 0;
-            if ((clientMgr.getBid().length() != 0) && (encodeObject.getBid().equals(clientMgr.getBid()))) {
-                bidnum = clientMgr.getBidNum();
-                tidnum = (clientMgr.getTidMap().get(encodeObject.getTid()) != null) ? clientMgr
-                        .getTidMap().get(encodeObject.getTid()) : 0;
+            int groupIdnum = 0;
+            int streamIdnum = 0;
+            if ((clientMgr.getGroupId().length() != 0) && (encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
+                groupIdnum = clientMgr.getGroupIdNum();
+                streamIdnum = (clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null) ? clientMgr
+                        .getStreamIdMap().get(encodeObject.getStreamId()) : 0;
             }
-            encodeObject.setBidNum(bidnum);
-            encodeObject.setTidNum(tidnum);
-            if (bidnum == 0 || tidnum == 0) {
-                encodeObject.setBidTransfer(false);
+            encodeObject.setGroupIdNum(groupIdnum);
+            encodeObject.setStreamIdNum(streamIdnum);
+            if (groupIdnum == 0 || streamIdnum == 0) {
+                encodeObject.setGroupIdTransfer(false);
             }
         }
         if (this.configure.isNeedDataEncry()) {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
index 47b5b60..1ecd39a 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
@@ -75,19 +75,19 @@ public class MetricWorkerThread extends Thread implements Closeable {
     /**
      * get string key
      *
-     * @param bid     - bid
-     * @param tid     - tid
+     * @param groupId     - groupId
+     * @param streamId     - streamId
      * @param localIp - ip
      * @return
      */
-    private String getKeyStringByConfig(String bid, String tid, String localIp, long keyTime) {
+    private String getKeyStringByConfig(String groupId, String streamId, String localIp, long keyTime) {
         StringBuilder builder = new StringBuilder();
-        String bidStr = proxyClientConfig.isUseBidAsKey() ? bid : DEFAULT_KEY_ITEM;
-        String tidStr = proxyClientConfig.isUseTidAsKey() ? tid : DEFAULT_KEY_ITEM;
+        String groupIdStr = proxyClientConfig.isUseGroupIdAsKey() ? groupId : DEFAULT_KEY_ITEM;
+        String streamIdStr = proxyClientConfig.isUseStreamIdAsKey() ? streamId : DEFAULT_KEY_ITEM;
         String localIpStr = proxyClientConfig.isUseLocalIpAsKey() ? localIp : DEFAULT_KEY_ITEM;
 
-        builder.append(bidStr).append(DEFAULT_KEY_SPLITTER)
-                .append(tidStr).append(DEFAULT_KEY_SPLITTER)
+        builder.append(groupIdStr).append(DEFAULT_KEY_SPLITTER)
+                .append(streamIdStr).append(DEFAULT_KEY_SPLITTER)
                 .append(localIpStr).append(DEFAULT_KEY_SPLITTER)
                 .append(keyTime);
         return builder.toString();
@@ -97,18 +97,19 @@ public class MetricWorkerThread extends Thread implements Closeable {
      * record num
      *
      * @param msgId    - msg uuid
-     * @param bid      - bid
-     * @param tid      - tid
+     * @param groupId      - groupId
+     * @param streamId      - streamId
      * @param localIp  - ip
      * @param packTime - package time
      * @param dt       - dt
      * @param num      - num
      */
-    public void recordNumByKey(String msgId, String bid, String tid, String localIp, long packTime, long dt, int num) {
+    public void recordNumByKey(String msgId, String groupId, String streamId,
+                               String localIp, long packTime, long dt, int num) {
         if (!enableSlaMetric) {
             return;
         }
-        MessageRecord messageRecord = new MessageRecord(bid, tid, localIp, msgId,
+        MessageRecord messageRecord = new MessageRecord(groupId, streamId, localIp, msgId,
                 getFormatKeyTime(dt), getFormatKeyTime(packTime), num);
 
         metricValueCache.putIfAbsent(msgId, messageRecord);
@@ -135,9 +136,9 @@ public class MetricWorkerThread extends Thread implements Closeable {
         MessageRecord messageRecord = metricValueCache.remove(msgId);
         if (messageRecord != null) {
 
-            String packTimeKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+            String packTimeKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
                     messageRecord.getLocalIp(), messageRecord.getPackTime());
-            String dtKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+            String dtKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
                     messageRecord.getLocalIp(), messageRecord.getDt());
 
             MetricTimeNumSummary packTimeSummary = getMetricSummary(packTimeKeyName,
@@ -160,9 +161,9 @@ public class MetricWorkerThread extends Thread implements Closeable {
         MessageRecord messageRecord = metricValueCache.remove(msgId);
         if (messageRecord != null) {
 
-            String packTimeKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+            String packTimeKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
                     messageRecord.getLocalIp(), messageRecord.getPackTime());
-            String dtKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+            String dtKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
                     messageRecord.getLocalIp(), messageRecord.getDt());
 
             MetricTimeNumSummary packTimeSummary = getMetricSummary(packTimeKeyName,
@@ -213,11 +214,11 @@ public class MetricWorkerThread extends Thread implements Closeable {
         }
     }
 
-    private void sendSingleLine(String line, String tid, long dtTime) {
+    private void sendSingleLine(String line, String streamId, long dtTime) {
         EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7,
                 false, false, false,
                 dtTime, idGenerator.getNextInt(),
-                proxyClientConfig.getMetricBid(), tid, "", "", Utils.getLocalIp());
+                proxyClientConfig.getMetricGroupId(), streamId, "", "", Utils.getLocalIp());
         MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
         tryToSendMetricToManager(encodeObject, callBack);
     }
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
index a786fe6..20be209 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
@@ -35,7 +35,7 @@ public class ProxyUtils {
     private static final Set<String> invalidAttr = new HashSet<>();
 
     static {
-        Collections.addAll(invalidAttr, "bid", "tid", "dt", "msgUUID", "cp",
+        Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
             "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
             "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyDesKey");
     }
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
index ebe88f5..05d05a7 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
@@ -224,7 +224,7 @@ public class ServiceDiscoveryUtils {
         try {
             File managerIpListFile = new File(managerIpLocalPath);
             if (!managerIpListFile.exists()) {
-                log.info("ServiceDiscovery no found local bidInfo file, "
+                log.info("ServiceDiscovery no found local groupIdInfo file, "
                         + "doesn't matter, path is [" + managerIpLocalPath + "].");
                 return null;
             }
diff --git a/inlong-dataproxy/bin/prepare_env.sh b/inlong-dataproxy/bin/prepare_env.sh
index 5cc59f4..7883b4e 100755
--- a/inlong-dataproxy/bin/prepare_env.sh
+++ b/inlong-dataproxy/bin/prepare_env.sh
@@ -21,7 +21,7 @@
 
 cd "$(dirname "$0")"/../conf
 
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,bid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
   do
     touch $i
 done
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
index 0a0b24c..1be867e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
@@ -23,28 +23,28 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
 
 public class ProxyMessage {
 
-    private String bid;
+    private String groupId;
     private String topic;
-    private String tid;
+    private String streamId;
 
     private Map<String, String> attributeMap;
 
     private byte[] data;
 
-    public ProxyMessage(String bid, String tid, Map<String, String> attributeMap, byte[] data) {
-        this.bid = bid;
-        this.tid = tid;
+    public ProxyMessage(String groupId, String streamId, Map<String, String> attributeMap, byte[] data) {
+        this.groupId = groupId;
+        this.streamId = streamId;
         this.attributeMap = attributeMap;
         this.data = data;
     }
 
-    public String getBid() {
-        return bid;
+    public String getGroupId() {
+        return groupId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
-        this.attributeMap.put(AttributeConstants.BUSINESS_ID, bid);
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+        this.attributeMap.put(AttributeConstants.GROUP_ID, groupId);
     }
 
     public String getTopic() {
@@ -55,13 +55,13 @@ public class ProxyMessage {
         this.topic = topic;
     }
 
-    public String getTid() {
-        return tid;
+    public String getStreamId() {
+        return streamId;
     }
 
-    public void setTid(String tid) {
-        this.tid = tid;
-        this.attributeMap.put(AttributeConstants.INTERFACE_ID, tid);
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+        this.attributeMap.put(AttributeConstants.INTERFACE_ID, streamId);
     }
 
     public Map<String, String> getAttributeMap() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 8fbbd33..1c3caa5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -34,7 +34,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.apache.inlong.dataproxy.config.RemoteConfigJson.DataItem;
-import org.apache.inlong.dataproxy.config.holder.BidPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.FileConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
@@ -57,8 +57,8 @@ public class ConfigManager {
     private final PropertiesConfigHolder topicConfig =
             new PropertiesConfigHolder("topics.properties");
     private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
-    private final BidPropertiesHolder bidConfig =
-            new BidPropertiesHolder("bid_mapping.properties");
+    private final GroupIdPropertiesHolder groupIdConfig =
+            new GroupIdPropertiesHolder("groupid_mapping.properties");
     private final PropertiesConfigHolder dcConfig =
             new PropertiesConfigHolder("dc_mapping.properties");
     private final PropertiesConfigHolder transferConfig =
@@ -168,16 +168,16 @@ public class ConfigManager {
         return mxConfig.getMxPropertiesMaps();
     }
 
-    public Map<String, String> getBidMappingProperties() {
-        return bidConfig.getBidMappingProperties();
+    public Map<String, String> getGroupIdMappingProperties() {
+        return groupIdConfig.getGroupIdMappingProperties();
     }
 
-    public Map<String, Map<String, String>> getTidMappingProperties() {
-        return bidConfig.getTidMappingProperties();
+    public Map<String, Map<String, String>> getStreamIdMappingProperties() {
+        return groupIdConfig.getStreamIdMappingProperties();
     }
 
-    public Map<String, String> getBidEnableMappingProperties() {
-        return bidConfig.getBidEnableMappingProperties();
+    public Map<String, String> getGroupIdEnableMappingProperties() {
+        return groupIdConfig.getGroupIdEnableMappingProperties();
     }
 
     public Map<String, String> getCommonProperties() {
@@ -258,19 +258,19 @@ public class ConfigManager {
                 // request with post
                 CloseableHttpResponse response = httpClient.execute(httpGet);
                 String returnStr = EntityUtils.toString(response.getEntity());
-                // get bid <-> topic and m value.
+                // get groupId <-> topic and m value.
 
                 RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
-                Map<String, String> bidToTopic = new HashMap<String, String>();
-                Map<String, String> bidToMValue = new HashMap<String, String>();
+                Map<String, String> groupIdToTopic = new HashMap<String, String>();
+                Map<String, String> groupIdToMValue = new HashMap<String, String>();
 
                 if (configJson.getErrCode() == 0) {
                     for (DataItem item : configJson.getData()) {
-                        bidToMValue.put(item.getBid(), item.getM());
-                        bidToTopic.put(item.getBid(), item.getTopic());
+                        groupIdToMValue.put(item.getGroupId(), item.getM());
+                        groupIdToTopic.put(item.getGroupId(), item.getTopic());
                     }
-                    configManager.addMxProperties(bidToMValue);
-                    configManager.addTopicProperties(bidToTopic);
+                    configManager.addMxProperties(groupIdToMValue);
+                    configManager.addTopicProperties(groupIdToTopic);
                 }
             } catch (Exception ex) {
                 LOG.error("exception caught", ex);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 44e932e..91b5413 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -35,12 +35,12 @@ public class RemoteConfigJson {
 
     public static class DataItem {
 
-        private String bid;
+        private String groupId;
         private String topic;
         private String m;
 
-        public String getBid() {
-            return bid;
+        public String getGroupId() {
+            return groupId;
         }
 
         public String getTopic() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 6137886..ae8a035 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -187,7 +187,7 @@ public class RemoteConfigManager implements IRepository {
             // request with get
             CloseableHttpResponse response = httpClient.execute(httpGet);
             String returnStr = EntityUtils.toString(response.getEntity());
-            // get bid <-> topic and m value.
+            // get groupId <-> topic and m value.
 
             DataProxyConfigResponse proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class);
             if (!proxyResponse.isResult()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
similarity index 54%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
index 5bb6f82..896b059 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
@@ -25,21 +25,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * bid to m value
+ * groupId to m value
  */
-public class BidPropertiesHolder extends PropertiesConfigHolder {
+public class GroupIdPropertiesHolder extends PropertiesConfigHolder {
 
-    private static final Logger LOG = LoggerFactory.getLogger(BidPropertiesHolder.class);
-    private static final String BID_VALUE_SPLITTER = "#";
+    private static final Logger LOG = LoggerFactory.getLogger(GroupIdPropertiesHolder.class);
+    private static final String GROUPID_VALUE_SPLITTER = "#";
 
-    private Map<String, String> bidMappingProperties =
+    private Map<String, String> groupIdMappingProperties =
             new HashMap<String, String>();
-    private Map<String, Map<String, String>> tidMappingProperties =
+    private Map<String, Map<String, String>> streamIdMappingProperties =
             new HashMap<String, Map<String, String>>();
-    private Map<String, String> bidEnableMappingProperties =
+    private Map<String, String> groupIdEnableMappingProperties =
             new HashMap<String, String>();
 
-    public BidPropertiesHolder(String fileName) {
+    public GroupIdPropertiesHolder(String fileName) {
         super(fileName);
     }
 
@@ -47,41 +47,41 @@ public class BidPropertiesHolder extends PropertiesConfigHolder {
     public void loadFromFileToHolder() {
         super.loadFromFileToHolder();
         try {
-            Map<String, String> tmpBidMappingProperties =
+            Map<String, String> tmpGroupIdMappingProperties =
                     new HashMap<String, String>();
-            Map<String, Map<String, String>> tmpTidMappingProperties =
+            Map<String, Map<String, String>> tmpStreamIdMappingProperties =
                     new HashMap<String, Map<String, String>>();
-            Map<String, String> tmpBidEnableMappingProperties = new HashMap<String, String>();
+            Map<String, String> tmpGroupIdEnableMappingProperties = new HashMap<String, String>();
             for (Map.Entry<String, String> entry : super.getHolder().entrySet()) {
-                String[] sArray = StringUtils.split(entry.getKey(), BID_VALUE_SPLITTER);
+                String[] sArray = StringUtils.split(entry.getKey(), GROUPID_VALUE_SPLITTER);
                 if (sArray.length != 3) {
-                    LOG.warn("invalid bid key {}", entry.getKey());
+                    LOG.warn("invalid groupId key {}", entry.getKey());
                     continue;
                 }
-                tmpBidMappingProperties.put(sArray[0].trim(), sArray[1].trim());
-                tmpBidEnableMappingProperties.put(sArray[0].trim(), sArray[2].trim());
+                tmpGroupIdMappingProperties.put(sArray[0].trim(), sArray[1].trim());
+                tmpGroupIdEnableMappingProperties.put(sArray[0].trim(), sArray[2].trim());
                 if (StringUtils.isNotBlank(entry.getValue())) {
-                    tmpTidMappingProperties.put(sArray[0].trim(),
+                    tmpStreamIdMappingProperties.put(sArray[0].trim(),
                             MAP_SPLITTER.split(entry.getValue()));
                 }
             }
-            bidMappingProperties = tmpBidMappingProperties;
-            tidMappingProperties = tmpTidMappingProperties;
-            bidEnableMappingProperties = tmpBidEnableMappingProperties;
+            groupIdMappingProperties = tmpGroupIdMappingProperties;
+            streamIdMappingProperties = tmpStreamIdMappingProperties;
+            groupIdEnableMappingProperties = tmpGroupIdEnableMappingProperties;
         } catch (Exception e) {
             LOG.error("loadConfig error :", e);
         }
     }
 
-    public Map<String, String> getBidMappingProperties() {
-        return bidMappingProperties;
+    public Map<String, String> getGroupIdMappingProperties() {
+        return groupIdMappingProperties;
     }
 
-    public Map<String, Map<String, String>> getTidMappingProperties() {
-        return tidMappingProperties;
+    public Map<String, Map<String, String>> getStreamIdMappingProperties() {
+        return streamIdMappingProperties;
     }
 
-    public Map<String, String> getBidEnableMappingProperties() {
-        return bidEnableMappingProperties;
+    public Map<String, String> getGroupIdEnableMappingProperties() {
+        return groupIdEnableMappingProperties;
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 168ea87..b6f46da 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -54,27 +54,27 @@ public class ConfigMessageServlet extends HttpServlet {
     }
 
     private boolean handleTopicConfig(RequestContent requestContent) {
-        Map<String, String> bidToTopic = new HashMap<String, String>();
+        Map<String, String> groupIdToTopic = new HashMap<String, String>();
         for (Map<String, String> item : requestContent.getContent()) {
-            bidToTopic.put(item.get("bid"), item.get("topic"));
+            groupIdToTopic.put(item.get("groupId"), item.get("topic"));
         }
         if ("add".equals(requestContent.getOperationType())) {
-            return configManager.addTopicProperties(bidToTopic);
+            return configManager.addTopicProperties(groupIdToTopic);
         } else if ("delete".equals(requestContent.getOperationType())) {
-            return configManager.deleteTopicProperties(bidToTopic);
+            return configManager.deleteTopicProperties(groupIdToTopic);
         }
         return false;
     }
 
     private boolean handleMxConfig(RequestContent requestContent) {
-        Map<String, String> bidToMValue = new HashMap<String, String>();
+        Map<String, String> groupIdToMValue = new HashMap<String, String>();
         for (Map<String, String> item : requestContent.getContent()) {
-            bidToMValue.put(item.get("bid"), item.get("m"));
+            groupIdToMValue.put(item.get("groupId"), item.get("m"));
         }
         if ("add".equals(requestContent.getOperationType())) {
-            return configManager.addMxProperties(bidToMValue);
+            return configManager.addMxProperties(groupIdToMValue);
         } else if ("delete".equals(requestContent.getOperationType())) {
-            return configManager.deleteMxProperties(bidToMValue);
+            return configManager.deleteMxProperties(groupIdToMValue);
         }
         return false;
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index bb63a7b..7290089 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -29,18 +29,18 @@ public interface AttributeConstants {
     String OPERATION_CONTENT = "content";
 
     /**
-     * business id unique string id for each business or product
+     * group id unique string id for each business or product
      */
-    String BUSINESS_ID = "bid";
+    String GROUP_ID = "groupId";
 
     /**
      * interface id unique string id for each interface of business An interface stand for a kind of
      * data
      */
-    String INTERFACE_ID = "tid";
+    String INTERFACE_ID = "streamId";
 
     /**
-     * iname is like a tid but used in file protocol(m=xxx)
+     * iname is like a streamId but used in file protocol(m=xxx)
      */
     String INAME = "iname";
 
@@ -77,7 +77,7 @@ public interface AttributeConstants {
 
     String NUM2NAME = "num2name";
 
-    String BID_NUM = "bidnum";
+    String GROUPID_NUM = "groupIdnum";
 
-    String TID_NUM = "tidnum";
+    String STREAMID_NUM = "streamIdnum";
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index b906386..fedcc80 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -99,7 +99,7 @@ public class ConfigConstants {
     public static final String FILE_CHECK_DATA = "file-check-data";
     public static final String MINUTE_CHECK_DATA = "minute-check-data";
     public static final String SLA_METRIC_DATA = "sla-metric-data";
-    public static final String SLA_METRIC_BID = "manager_sla_metric";
+    public static final String SLA_METRIC_GROUPID = "manager_sla_metric";
 
     public static final String FILE_BODY = "file-body";
     public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 658dea9..034b2b4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -89,8 +89,8 @@ public class MetaSink extends AbstractSink implements Configurable {
     private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
 
     private static final String LOG_TOPIC = "proxy-log-topic";
-    private static final String TID = "proxy-log-tid";
-    private static final String BID = "proxy-log-bid";
+    private static final String STREAMID = "proxy-log-streamid";
+    private static final String GROUPID = "proxy-log-groupid";
     private static final String SEND_REMOTE = "send-remote";
     private static final String topicsFilePath = "topics.properties";
     private static final String slaTopicFilePath = "slaTopics.properties";
@@ -104,8 +104,8 @@ public class MetaSink extends AbstractSink implements Configurable {
     private int maxSurvivedSize = 100000;
 
     private String proxyLogTopic = "teg_manager";
-    private String proxyLogBid = "b_teg_manager";
-    private String proxyLogTid = "proxy_measure_log";
+    private String proxyLogGroupId = "b_teg_manager";
+    private String proxyLogStreamId = "proxy_measure_log";
     private boolean sendRemote = false;
     private ConfigManager configManager;
     private Map<String, String> topicProperties;
@@ -371,13 +371,13 @@ public class MetaSink extends AbstractSink implements Configurable {
                 } else {
                     Message message = new Message(topic, event.getBody());
                     message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-                    String tid = "";
+                    String streamId = "";
                     if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                        tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+                        streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
                     } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                        tid = event.getHeaders().get(AttributeConstants.INAME);
+                        streamId = event.getHeaders().get(AttributeConstants.INAME);
                     }
-                    message.putSystemHeader(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+                    message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
 
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
@@ -399,13 +399,13 @@ public class MetaSink extends AbstractSink implements Configurable {
 
                     Message message = new Message(topic, event.getBody());
                     message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-                    String tid = "";
+                    String streamId = "";
                     if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                        tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+                        streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
                     } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                        tid = event.getHeaders().get(AttributeConstants.INAME);
+                        streamId = event.getHeaders().get(AttributeConstants.INAME);
                     }
-                    message.putSystemHeader(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+                    message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
 
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
@@ -700,8 +700,8 @@ public class MetaSink extends AbstractSink implements Configurable {
         }
         if (sendRemote) {
             proxyLogTopic = context.getString(LOG_TOPIC, proxyLogTopic);
-            proxyLogBid = context.getString(BID, proxyLogTid);
-            proxyLogTid = context.getString(TID, proxyLogTid);
+            proxyLogGroupId = context.getString(GROUPID, proxyLogStreamId);
+            proxyLogStreamId = context.getString(STREAMID, proxyLogStreamId);
         }
 
         resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e56853a..0fb50ba 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -413,16 +413,16 @@ public class PulsarSink extends AbstractSink implements Configurable,
 
     private void editStatistic(final Event event, String keyPostfix, String msgId) {
         String topic = "";
-        String tid = "";
+        String streamId = "";
         String nodeIp = null;
         if (event != null) {
             if (event.getHeaders().containsKey(TOPIC)) {
                 topic = event.getHeaders().get(TOPIC);
             }
             if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+                streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
             } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                tid = event.getHeaders().get(AttributeConstants.INAME);
+                streamId = event.getHeaders().get(AttributeConstants.INAME);
             }
 
             /*
@@ -459,12 +459,12 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     }
 
                     /*
-                     * SINK_INTF#metasink1#topic#tid#clientIp#busIP#pkgTime#successCnt#packcnt
+                     * SINK_INTF#metasink1#topic#streamId#clientIp#busIP#pkgTime#successCnt#packcnt
                      * #packsize#failCnt
                      */
                     StringBuilder newbase = new StringBuilder();
                     newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
-                            .append(tid).append(SEPARATOR).append(nodeIp)
+                            .append(streamId).append(SEPARATOR).append(nodeIp)
                             .append(SEPARATOR).append(NetworkUtils.getLocalIp())
                             .append(SEPARATOR).append(msgId).append(SEPARATOR)
                             .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7aae1b6..ebf0822 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -155,13 +155,13 @@ public class PulsarClientService {
 
         Map<String, String> proMap = new HashMap<>();
         proMap.put("tdbusip", localIp);
-        String tid = "";
+        String streamId = "";
         if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-            tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+            streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
         } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-            tid = event.getHeaders().get(AttributeConstants.INAME);
+            streamId = event.getHeaders().get(AttributeConstants.INAME);
         }
-        proMap.put(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+        proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
         logger.debug("producer send msg!");
         TopicProducerInfo forCallBackP = producer;
         forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 468e9f6..6d36fbf 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -255,8 +255,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                             + ";Connection info:" + channel.toString()));
         }
 
-        int bidNum = cb.readUnsignedShort();
-        int tidNum = cb.readUnsignedShort();
+        int groupIdNum = cb.readUnsignedShort();
+        int streamIdNum = cb.readUnsignedShort();
         final int extendField = cb.readUnsignedShort();
         long dataTime = cb.readUnsignedInt();
         int msgCount = cb.readUnsignedShort();
@@ -299,35 +299,35 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
                 totalDataLen, attrLen, strAttr, bodyLen);
 
-            String bid = null;
-            String tid = null;
+            String groupId = null;
+            String streamId = null;
 
-            if (commonAttrMap.containsKey(AttributeConstants.BUSINESS_ID)) {
-                bid = commonAttrMap.get(AttributeConstants.BUSINESS_ID);
+            if (commonAttrMap.containsKey(AttributeConstants.GROUP_ID)) {
+                groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
             }
             if (commonAttrMap.containsKey(AttributeConstants.INTERFACE_ID)) {
-                tid = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
+                streamId = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
             }
 
-            if ((bid != null) && (tid != null)) {
+            if ((groupId != null) && (streamId != null)) {
                 commonAttrMap.put(AttributeConstants.NUM2NAME, "FALSE");
                 dataBuf.putShort(BIN_MSG_EXTEND_OFFSET, (short) (extendField | 0x4));
             } else {
-                boolean hasNumBid = (((extendField & 0x4) >> 2) == 0x0);
-                if (hasNumBid && (0 != bidNum) && (0 != tidNum)) {
+                boolean hasNumGroupId = (((extendField & 0x4) >> 2) == 0x0);
+                if (hasNumGroupId && (0 != groupIdNum) && (0 != streamIdNum)) {
                     commonAttrMap.put(AttributeConstants.NUM2NAME, "TRUE");
-                    commonAttrMap.put(AttributeConstants.BID_NUM, String.valueOf(bidNum));
-                    commonAttrMap.put(AttributeConstants.TID_NUM, String.valueOf(tidNum));
+                    commonAttrMap.put(AttributeConstants.GROUPID_NUM, String.valueOf(groupIdNum));
+                    commonAttrMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(streamIdNum));
                 }
             }
 
             if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !index) {
                 List<ProxyMessage> msgList = new ArrayList<>(1);
-                msgList.add(new ProxyMessage(bid, tid, commonAttrMap, dataBuf.array()));
+                msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, dataBuf.array()));
                 resultMap.put(ConfigConstants.MSG_LIST, msgList);
             } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                 List<ProxyMessage> msgList = new ArrayList<>(1);
-                msgList.add(new ProxyMessage(bid, tid, commonAttrMap,
+                msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
                         (byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
                 resultMap.put(ConfigConstants.MSG_LIST, msgList);
             }
@@ -411,8 +411,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
 
         // fill up attr map with some keys.
         commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
-        String bid = commonAttrMap.get(AttributeConstants.BUSINESS_ID);
-        String tid = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
+        String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
+        String streamId = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
 
         // add message count attr
         String cntStr = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
@@ -433,12 +433,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                 byte[] record = new byte[singleMsgLen];
                 bodyBuffer.get(record);
 
-                ProxyMessage message = new ProxyMessage(bid, tid, commonAttrMap, record);
+                ProxyMessage message = new ProxyMessage(groupId, streamId, commonAttrMap, record);
                 msgList.add(message);
             }
         } else {
             msgList = new ArrayList<>(1);
-            msgList.add(new ProxyMessage(bid, tid, commonAttrMap, bodyData));
+            msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, bodyData));
         }
         resultMap.put(ConfigConstants.MSG_LIST, msgList);
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 46d5b35..eadf2bc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.dataproxy.source;
 
 import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_BID;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
@@ -214,29 +214,29 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         }
     }
 
-    private void checkBidInfo(ProxyMessage message, Map<String, String> commonAttrMap,
+    private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
         Map<String, String> attrMap, AtomicReference<String> topicInfo) {
-        String bid = message.getBid();
-        String tid;
-        if (null != bid) {
+        String groupId = message.getGroupId();
+        String streamId;
+        if (null != groupId) {
             String from = commonAttrMap.get(AttributeConstants.FROM);
             if ("dc".equals(from)) {
-                String dcInterfaceId = message.getTid();
+                String dcInterfaceId = message.getStreamId();
                 if (StringUtils.isNotEmpty(dcInterfaceId)
                     && configManager.getDcMappingProperties()
                     .containsKey(dcInterfaceId.trim())) {
-                    bid = configManager.getDcMappingProperties()
+                    groupId = configManager.getDcMappingProperties()
                         .get(dcInterfaceId.trim()).trim();
-                    message.setBid(bid);
+                    message.setGroupId(groupId);
                 }
             }
 
-            String value = configManager.getTopicProperties().get(bid);
+            String value = configManager.getTopicProperties().get(groupId);
             if (StringUtils.isNotEmpty(value)) {
                 topicInfo.set(value.trim());
             }
 
-            Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(bid);
+            Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
             if (mxValue != null && mxValue.size() != 0) {
                 message.getAttributeMap().putAll(mxValue);
             } else {
@@ -244,30 +244,30 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             }
         } else {
             String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
-            String bidNum = commonAttrMap.get(AttributeConstants.BID_NUM);
-            String tidNum = commonAttrMap.get(AttributeConstants.TID_NUM);
-
-            if (configManager.getBidMappingProperties() != null
-                && configManager.getTidMappingProperties() != null) {
-                bid = configManager.getBidMappingProperties().get(bidNum);
-                tid = (configManager.getTidMappingProperties().get(bidNum) == null)
-                    ? null : configManager.getTidMappingProperties().get(bidNum).get(tidNum);
-                if (bid != null && tid != null) {
+            String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
+            String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
+
+            if (configManager.getGroupIdMappingProperties() != null
+                && configManager.getStreamIdMappingProperties() != null) {
+                groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
+                streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
+                    ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                if (groupId != null && streamId != null) {
                     String enableTrans =
-                        (configManager.getBidEnableMappingProperties() == null)
-                            ? null : configManager.getBidEnableMappingProperties().get(bidNum);
+                        (configManager.getGroupIdEnableMappingProperties() == null)
+                            ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
                     if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
                         .equalsIgnoreCase(num2name))) {
-                        String extraAttr = "bid=" + bid + "&" + "tid=" + tid;
+                        String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), extraAttr));
                     }
 
-                    attrMap.put(AttributeConstants.BUSINESS_ID, bid);
-                    attrMap.put(AttributeConstants.INTERFACE_ID, tid);
-                    message.setBid(bid);
-                    message.setTid(tid);
+                    attrMap.put(AttributeConstants.GROUP_ID, groupId);
+                    attrMap.put(AttributeConstants.INTERFACE_ID, streamId);
+                    message.setGroupId(groupId);
+                    message.setStreamId(streamId);
 
-                    String value = configManager.getTopicProperties().get(bid);
+                    String value = configManager.getTopicProperties().get(groupId);
                     if (StringUtils.isNotEmpty(value)) {
                         topicInfo.set(value.trim());
                     }
@@ -285,25 +285,25 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             String topic = this.defaultTopic;
 
             AtomicReference<String> topicInfo = new AtomicReference<>(topic);
-            checkBidInfo(message, commonAttrMap, attrMap, topicInfo);
+            checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
             topic = topicInfo.get();
 
-//                if(bid==null)bid="b_test";//default bid
+//                if(groupId==null)groupId="b_test";//default groupId
 
             message.setTopic(topic);
             commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
 
-            String bid = message.getBid();
-            String tid = message.getTid();
+            String groupId = message.getGroupId();
+            String streamId = message.getStreamId();
 
             // whether sla
-            if (SLA_METRIC_BID.equals(bid)) {
+            if (SLA_METRIC_GROUPID.equals(groupId)) {
                 commonAttrMap.put(SLA_METRIC_DATA, "true");
                 message.setTopic(SLA_METRIC_DATA);
             }
 
-            if (bid != null && tid != null) {
-                String tubeSwtichKey = bid + SEPARATOR + tid;
+            if (groupId != null && streamId != null) {
+                String tubeSwtichKey = groupId + SEPARATOR + streamId;
                 if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
                     && "false".equals(configManager.getTubeSwitchProperties()
                     .get(tubeSwtichKey).trim())) {
@@ -326,14 +326,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 }
             }
 
-            if (tid == null) {
-                tid = "";
+            if (streamId == null) {
+                streamId = "";
             }
-            HashMap<String, List<ProxyMessage>> tidMsgMap = messageMap
+            HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
                 .computeIfAbsent(topic, k -> new HashMap<>());
-            List<ProxyMessage> tidMsgList = tidMsgMap
-                .computeIfAbsent(tid, k -> new ArrayList<>());
-            tidMsgList.add(message);
+            List<ProxyMessage> streamIdMsgList = streamIdMsgMap
+                .computeIfAbsent(streamId, k -> new ArrayList<>());
+            streamIdMsgList.add(message);
         }
     }
 
@@ -349,11 +349,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         }
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
-            for (Map.Entry<String, List<ProxyMessage>> tidEntry : topicEntry.getValue().entrySet()) {
+            for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
 
                 TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
                 Map<String, String> headers = new HashMap<String, String>();
-                for (ProxyMessage message : tidEntry.getValue()) {
+                for (ProxyMessage message : streamIdEntry.getValue()) {
                     if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                         message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
                         tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
@@ -382,7 +382,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 }
 
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
-                headers.put(AttributeConstants.INTERFACE_ID, tidEntry.getKey());
+                headers.put(AttributeConstants.INTERFACE_ID, streamIdEntry.getKey());
                 headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                 // every message share the same msg cnt? what if msgType = 5
@@ -396,7 +396,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 if (StringUtils.isNotEmpty(sequenceId)) {
 
                     StringBuilder sidBuilder = new StringBuilder();
-                    sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(tidEntry.getKey())
+                    sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
                         .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
@@ -412,8 +412,8 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     throw new MessageIDException(uniqVal,
                         ErrorCode.DT_ERROR,
                         new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
-                            + " has error, detail is: topic=" + topicEntry.getKey() + "&tid="
-                            + tidEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
+                            + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
+                            + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
                 }
 
                 dtten = dtten / 1000 / 60 / 10;