You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 07:43:14 UTC
[incubator-inlong] branch master updated: [INLONG-1741] [dataproxy]
Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e8e64ed [INLONG-1741] [dataproxy] Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)
e8e64ed is described below
commit e8e64ed35eb84511bb9d7b49d1e42820660db609
Author: dockerzhang <do...@apache.org>
AuthorDate: Thu Nov 4 15:43:09 2021 +0800
[INLONG-1741] [dataproxy] Modify bid and tid (or dsid) to inlongGroupId and inlongStreamId (#1743)
Co-authored-by: dockerzhang <do...@tencent.com>
---
.../inlong/commons/msg/AttributeConstants.java | 8 +-
.../java/org/apache/inlong/commons/msg/TDMsg1.java | 26 +--
.../inlong/dataproxy/DefaultMessageSender.java | 213 +++++++++++----------
.../org/apache/inlong/dataproxy/MessageSender.java | 28 +--
.../apache/inlong/dataproxy/ProxyClientConfig.java | 50 ++---
.../inlong/dataproxy/codec/EncodeObject.java | 99 +++++-----
.../inlong/dataproxy/codec/ProtocolEncoder.java | 10 +-
.../dataproxy/config/EncryptConfigEntry.java | 2 +-
.../inlong/dataproxy/config/ProxyConfigEntry.java | 33 ++--
.../dataproxy/config/ProxyConfigManager.java | 60 +++---
.../org/apache/inlong/dataproxy/demo/Event.java | 32 ++--
.../inlong/dataproxy/http/InternalHttpSender.java | 30 +--
.../inlong/dataproxy/metric/MessageRecord.java | 19 +-
.../apache/inlong/dataproxy/network/ClientMgr.java | 40 ++--
.../inlong/dataproxy/network/HttpMessage.java | 18 +-
.../inlong/dataproxy/network/HttpProxySender.java | 40 ++--
.../apache/inlong/dataproxy/network/Sender.java | 88 ++++-----
.../dataproxy/threads/MetricWorkerThread.java | 35 ++--
.../apache/inlong/dataproxy/utils/ProxyUtils.java | 2 +-
.../dataproxy/utils/ServiceDiscoveryUtils.java | 2 +-
inlong-dataproxy/bin/prepare_env.sh | 2 +-
.../apache/inlong/dataproxy/base/ProxyMessage.java | 30 +--
.../inlong/dataproxy/config/ConfigManager.java | 32 ++--
.../inlong/dataproxy/config/RemoteConfigJson.java | 6 +-
.../dataproxy/config/RemoteConfigManager.java | 2 +-
...iesHolder.java => GroupIdPropertiesHolder.java} | 50 ++---
.../config/remote/ConfigMessageServlet.java | 16 +-
.../dataproxy/consts/AttributeConstants.java | 12 +-
.../inlong/dataproxy/consts/ConfigConstants.java | 2 +-
.../org/apache/inlong/dataproxy/sink/MetaSink.java | 28 +--
.../apache/inlong/dataproxy/sink/PulsarSink.java | 10 +-
.../dataproxy/sink/pulsar/PulsarClientService.java | 8 +-
.../dataproxy/source/DefaultServiceDecoder.java | 36 ++--
.../dataproxy/source/ServerMessageHandler.java | 92 ++++-----
34 files changed, 586 insertions(+), 575 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
index 857d7f0..8811b14 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
@@ -23,20 +23,20 @@ public interface AttributeConstants {
String KEY_VALUE_SEPARATOR = "=";
/**
- * business id
+ * group id
* unique string id for each business or product
*/
- String BUSINESS_ID = "bid";
+ String GROUP_ID = "groupId";
/**
* interface id
* unique string id for each interface of business
* An interface stand for a kind of data
*/
- String INTERFACE_ID = "tid";
+ String INTERFACE_ID = "streamId";
/**
- * iname is like a tid but used in file protocol(m=xxx)
+ * iname is like a streamId but used in file protocol(m=xxx)
*/
String INAME = "iname";
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
index 4257854..506c4c6 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
@@ -38,8 +38,8 @@ public class TDMsg1 {
private static final int BIN_MSG_SNAPPY_TYPE = 1;
private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
- private static final int BIN_MSG_BID_OFFSET = 5;
- private static final int BIN_MSG_TID_OFFSET = 7;
+ private static final int BIN_MSG_GROUPID_OFFSET = 5;
+ private static final int BIN_MSG_STREAMID_OFFSET = 7;
private static final int BIN_MSG_EXTFIELD_OFFSET = 9;
private static final int BIN_MSG_COUNT_OFFSET = 15;
private static final int BIN_MSG_DATATIME_OFFSET = 11;
@@ -96,7 +96,7 @@ public class TDMsg1 {
private int datalen = 0;
private int msgcnt = 0;
private boolean compress;
- private boolean isNumBid = false;
+ private boolean isNumGroupId = false;
private boolean ischeck = true;
private final Version version;
@@ -634,7 +634,7 @@ public class TDMsg1 {
parsedBinInput = ByteBuffer.wrap(binMsg);
this.createtime = getBinCreatetime(parsedBinInput);
this.msgcnt = getBinMsgCnt(parsedBinInput);
- this.isNumBid = getBinNumFlag(parsedBinInput);
+ this.isNumGroupId = getBinNumFlag(parsedBinInput);
}
}
@@ -736,8 +736,8 @@ public class TDMsg1 {
int totalLen = parsedBinInput.getInt(BIN_MSG_TOTALLEN_OFFSET);
final int msgtype = parsedBinInput.get(BIN_MSG_MSGTYPE_OFFSET);
- int bidNum = parsedBinInput.getShort(BIN_MSG_BID_OFFSET);
- int tidNum = parsedBinInput.getShort(BIN_MSG_TID_OFFSET);
+ int groupIdNum = parsedBinInput.getShort(BIN_MSG_GROUPID_OFFSET);
+ int streamIdNum = parsedBinInput.getShort(BIN_MSG_STREAMID_OFFSET);
int bodyLen = parsedBinInput.getInt(BIN_MSG_BODYLEN_OFFSET);
long dataTime = parsedBinInput.getInt(BIN_MSG_DATATIME_OFFSET);
final int extField = parsedBinInput.getShort(BIN_MSG_EXTFIELD_OFFSET);
@@ -783,11 +783,11 @@ public class TDMsg1 {
break;
}
- //number bid/tid
- boolean isUseNumBid = ((extField & 0x4) == 0x0);
- if (isUseNumBid) {
- commonAttrMap.put(AttributeConstants.BUSINESS_ID, String.valueOf(bidNum));
- commonAttrMap.put(AttributeConstants.INTERFACE_ID, String.valueOf(tidNum));
+ //number groupId/streamId
+ boolean isUseNumGroupId = ((extField & 0x4) == 0x0);
+ if (isUseNumGroupId) {
+ commonAttrMap.put(AttributeConstants.GROUP_ID, String.valueOf(groupIdNum));
+ commonAttrMap.put(AttributeConstants.INTERFACE_ID, String.valueOf(streamIdNum));
}
boolean hasOtherAttr = ((extField & 0x1) == 0x1);
@@ -1099,8 +1099,8 @@ public class TDMsg1 {
return attrcnt;
}
- public boolean isNumBid() {
+ public boolean isNumGroupId() {
checkMode(false);
- return isNumBid;
+ return isNumGroupId;
}
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
index f91635d..4254032 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/DefaultMessageSender.java
@@ -42,10 +42,10 @@ public class DefaultMessageSender implements MessageSender {
private static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
private final Sender sender;
private final SequentialID idGenerator;
- private String bid;
+ private String groupId;
private int msgtype = ConfigConstants.MSG_TYPE;
private boolean isCompress = true;
- private boolean isBidTransfer = false;
+ private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
@@ -54,7 +54,7 @@ public class DefaultMessageSender implements MessageSender {
new ConcurrentHashMap<>();
private final IndexCollectThread indexCol;
- /* Store index <bid_tid,cnt>*/
+ /* Store index <groupId_streamId,cnt>*/
private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
private static final AtomicBoolean ManagerFetcherThreadStarted = new AtomicBoolean(false);
@@ -69,12 +69,12 @@ public class DefaultMessageSender implements MessageSender {
isSupportLF = supportLF;
}
- public boolean isBidTransfer() {
- return isBidTransfer;
+ public boolean isGroupIdTransfer() {
+ return isGroupIdTransfer;
}
- public void setBidTransfer(boolean isBidTransfer) {
- this.isBidTransfer = isBidTransfer;
+ public void setGroupIdTransfer(boolean isGroupIdTransfer) {
+ this.isGroupIdTransfer = isGroupIdTransfer;
}
public boolean isReport() {
@@ -109,12 +109,12 @@ public class DefaultMessageSender implements MessageSender {
this.isCompress = isCompress;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
@@ -125,7 +125,7 @@ public class DefaultMessageSender implements MessageSender {
ProxyUtils.validClientConfig(configure);
sender = new Sender(configure, selfDefineFactory);
idGenerator = new SequentialID(Utils.getLocalIp());
- bid = configure.getBid();
+ groupId = configure.getGroupId();
indexCol = new IndexCollectThread(storeIndex);
indexCol.start();
@@ -161,8 +161,8 @@ public class DefaultMessageSender implements MessageSender {
ChannelFactory selfDefineFactory) throws Exception {
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
Utils.getLocalIp(), null);
- proxyConfigManager.setBusinessId(configure.getBid());
- ProxyConfigEntry entry = proxyConfigManager.getBidConfigure();
+ proxyConfigManager.setGroupId(configure.getGroupId());
+ ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
DefaultMessageSender sender = cacheSender.get(entry.getClusterId());
if (sender != null) {
return sender;
@@ -195,43 +195,44 @@ public class DefaultMessageSender implements MessageSender {
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
}
- public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
- addIndexCnt(bid, tid, 1);
+ addIndexCnt(groupId, streamId, 1);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
- isBidTransfer, dt / 1000, idGenerator.getNextInt(), bid, tid, "");
+ isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, "");
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
- return sender.syncSendMessage(new EncodeObject(body, "bid=" + bid
- + "&tid=" + tid + "&dt=" + dt + "&cp=snappy",
- idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId
+ + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(body, "bid=" + bid + "&tid=" + tid + "&dt=" + dt,
- idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId
+ + "&streamId=" + streamId + "&dt=" + dt,
+ idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
}
- public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
- addIndexCnt(bid, tid, 1);
+ addIndexCnt(groupId, streamId, 1);
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
@@ -239,79 +240,79 @@ public class DefaultMessageSender implements MessageSender {
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
- isBidTransfer, dt / 1000,
- idGenerator.getNextInt(), bid, tid, attrs.toString());
+ isGroupIdTransfer, dt / 1000,
+ idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
- attrs.append("&bid=").append(bid).append("&tid=").append(tid).append("&dt=").append(dt);
+ attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
}
- public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
- isBidTransfer, dt / 1000,
- idGenerator.getNextInt(), bid, tid, "");
+ isGroupIdTransfer, dt / 1000,
+ idGenerator.getNextInt(), groupId, streamId, "");
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompress) {
- return sender.syncSendMessage(new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+ return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+ return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cnt=" + bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
}
- public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
- isBidTransfer, dt / 1000,
- idGenerator.getNextInt(), bid, tid, attrs.toString());
+ isGroupIdTransfer, dt / 1000,
+ idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
- attrs.append("&bid=").append(bid).append("&tid=").append(tid)
+ attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false, bid), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
@@ -325,101 +326,104 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String bid, String tid, long dt, String msgUUID,
+ String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
- addIndexCnt(bid, tid, 1);
+ addIndexCnt(groupId, streamId, 1);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport,
- isBidTransfer, dt / 1000, idGenerator.getNextInt(),
- bid, tid, "");
+ isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+ groupId, streamId, "");
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
- sender.asyncSendMessage(new EncodeObject(body, "bid="
- + bid + "&tid=" + tid + "&dt=" + dt + "&cp=snappy",
- idGenerator.getNextId(), this.getMsgtype(), true, bid), callback, msgUUID, timeout, timeUnit);
+ sender.asyncSendMessage(new EncodeObject(body, "groupId="
+ + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId),
+ callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(body, "bid=" + bid + "&tid=" + tid + "&dt=" + dt, idGenerator.getNextId(),
- this.getMsgtype(), false, bid), callback, msgUUID, timeout, timeUnit);
+ new EncodeObject(body, "groupId=" + groupId + "&streamId="
+ + streamId + "&dt=" + dt, idGenerator.getNextId(),
+ this.getMsgtype(), false, groupId), callback,
+ msgUUID, timeout, timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String bid, String tid, long dt, String msgUUID,
+ byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
- addIndexCnt(bid, tid, 1);
+ addIndexCnt(groupId, streamId, 1);
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd,
- isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
- bid, tid, attrs.toString());
+ isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+ groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
- attrs.append("&bid=").append(bid).append("&tid=").append(tid).append("&dt=").append(dt);
+ attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
sender.asyncSendMessage(new EncodeObject(body, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true, bid),
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId),
callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), false, bid),
+ this.getMsgtype(), false, groupId),
callback, msgUUID, timeout, timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
- String bid, String tid, long dt, String msgUUID,
+ String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress,
- isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
- bid, tid, "");
+ isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+ groupId, streamId, "");
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompress) {
sender.asyncSendMessage(
- new EncodeObject(bodyList, "bid=" + bid + "&tid=" + tid
+ new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
- true, bid), callback, msgUUID, timeout, timeUnit);
+ true, groupId), callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(bodyList, "bid=" + bid + "&tid="
- + tid + "&dt=" + dt + "&cnt=" + bodyList.size(),
+ new EncodeObject(bodyList, "groupId=" + groupId + "&streamId="
+ + streamId + "&dt=" + dt + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
- false, bid), callback, msgUUID, timeout, timeUnit);
+ false, groupId), callback, msgUUID, timeout, timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
@@ -427,34 +431,34 @@ public class DefaultMessageSender implements MessageSender {
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
-// if (!isBidTransfer)
+// if (!isGroupIdTransfer)
EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(),
- isCompress, isReport, isBidTransfer, dt / 1000, idGenerator.getNextInt(),
- bid, tid, attrs.toString());
+ isCompress, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
+ groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
- attrs.append("&bid=").append(bid).append("&tid=").append(tid)
+ attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), true, bid), callback, msgUUID, timeout, timeUnit);
+ this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), false, bid), callback, msgUUID, timeout, timeUnit);
+ this.getMsgtype(), false, groupId), callback, msgUUID, timeout, timeUnit);
}
}
}
- private void addIndexCnt(String bid, String tid, long cnt) {
+ private void addIndexCnt(String groupId, String streamId, long cnt) {
try {
- String key = bid + "|" + tid;
+ String key = groupId + "|" + streamId;
if (storeIndex.containsKey(key)) {
long sum = storeIndex.get(key);
storeIndex.put(key, sum + cnt);
@@ -466,8 +470,8 @@ public class DefaultMessageSender implements MessageSender {
}
}
- public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String bid,
- String tid, long dt, int sid, boolean isSupportLF, String msgUUID,
+ public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
+ String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
@@ -475,21 +479,21 @@ public class DefaultMessageSender implements MessageSender {
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
- isCompress, isReport, isBidTransfer,
- dt / 1000, sid, bid, tid, attrs.toString(), "data", "");
+ isCompress, isReport, isGroupIdTransfer,
+ dt / 1000, sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, timeout, timeUnit);
}
}
- private void asyncSendMetric(FileCallback callback, byte[] body, String bid,
- String tid, long dt, int sid, String ip, String msgUUID,
+ private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
+ String streamId, long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
@@ -498,46 +502,46 @@ public class DefaultMessageSender implements MessageSender {
boolean isCompressEnd = false;
if (msgtype == 7 || msgtype == 8) {
sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, isCompressEnd,
- isReport, isBidTransfer, dt / 1000,
- sid, bid, tid, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
+ isReport, isGroupIdTransfer, dt / 1000,
+ sid, groupId, streamId, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
}
}
- public void asyncsendMessageProxy(FileCallback callback, byte[] body, String bid, String tid,
+ public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
- asyncSendMetric(callback, body, bid, tid, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
+ asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}
- public void asyncsendMessageFile(FileCallback callback, byte[] body, String bid,
- String tid, long dt, int sid, String msgUUID,
+ public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
+ String streamId, long dt, int sid, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
- asyncSendMetric(callback, body, bid, tid, dt, sid, "", msgUUID, timeout, timeUnit, "file");
+ asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}
- public String sendMessageData(List<byte[]> bodyList, String bid,
- String tid, long dt, int sid, boolean isSupportLF, String msgUUID,
+ public String sendMessageData(List<byte[]> bodyList, String groupId,
+ String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
- addIndexCnt(bid, tid, bodyList.size());
+ addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress,
- isReport, isBidTransfer, dt / 1000,
- sid, bid, tid, attrs.toString(), "data", "");
+ isReport, isGroupIdTransfer, dt / 1000,
+ sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
}
return null;
}
- private String sendMetric(byte[] body, String bid, String tid, long dt, int sid, String ip, String msgUUID,
+ private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
@@ -545,20 +549,21 @@ public class DefaultMessageSender implements MessageSender {
}
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
- isBidTransfer, dt / 1000, sid, bid, tid, "", messageKey, ip);
+ isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
}
return null;
}
- public String sendMessageProxy(byte[] body, String bid, String tid, long dt, int sid, String ip, String msgUUID,
+ public String sendMessageProxy(byte[] body, String groupId, String streamId,
+ long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit) {
- return sendMetric(body, bid, tid, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
+ return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}
- public String sendMessageFile(byte[] body, String bid, String tid, long dt, int sid, String msgUUID,
+ public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
long timeout, TimeUnit timeUnit) {
- return sendMetric(body, bid, tid, dt, sid, "", msgUUID, timeout, timeUnit, "file");
+ return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}
private void shutdownInternalThreads() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
index fc79f66..ca17b54 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/MessageSender.java
@@ -28,7 +28,7 @@ public interface MessageSender {
/**
* This method provides a synchronized function which you want to send data
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
* This method is deprecated,we suggest you don't use it.
*
* @param body The data will be sent
@@ -44,19 +44,19 @@ public interface MessageSender {
* @param body The data will be sent
*
*/
- public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit);
/**
* This method provides a synchronized function which you want to send data without packing
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
*
* @param body The data will be sent
*
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair like <attrKey,attrValue>
*/
- public SendResult sendMessage(byte[] body, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
/**
@@ -65,24 +65,24 @@ public interface MessageSender {
*
* @param bodyList The data will be sent,which is a collection consisting of byte arrays
*/
- public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit);
/**
* This method provides a synchronized function which you want to send data with packing
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
*
*
* @param bodyList The data will be sent,which is a collection consisting of byte arrays
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair like <attrKey,attrValue>
*/
- public SendResult sendMessage(List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
/**
* This method provides an asynchronized function which you want to send data
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
* This method is deprecated,we suggest you don't use it.
*
*
@@ -96,7 +96,7 @@ public interface MessageSender {
/**
* This method provides a synchronized function which you want to send data without packing
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
*
*
* @param body The data will be sent
@@ -104,7 +104,7 @@ public interface MessageSender {
* and each element of extraAttrMap contains a pair like <attrKey,attrValue>
*/
public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String bid, String tid, long dt, String msgUUID,
+ byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
@@ -116,7 +116,7 @@ public interface MessageSender {
* @param body The data will be sent
*/
public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String bid, String tid, long dt, String msgUUID,
+ byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException;
/**
@@ -126,12 +126,12 @@ public interface MessageSender {
* @param bodyList The data will be sent,which is a collection consisting of byte arrays
*/
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException;
/**
* This method provides an asynchronized function which you want to send data with packing
- * with extra attributes except bid,tid,dt,etc
+ * with extra attributes except groupId,streamId,dt,etc
*
*
* @param bodyList The data will be sent,which is a collection consisting of byte arrays
@@ -139,7 +139,7 @@ public interface MessageSender {
* element of extraAttrMap contains a pair like <attrKey,attrValue>
*/
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String bid, String tid, long dt, String msgUUID,
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
index 5e437ee..43e9d40 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
@@ -37,7 +37,7 @@ public class ProxyClientConfig {
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
private String netTag;
- private String bid;
+ private String groupId;
private boolean isFile = false;
private boolean isLocalVisit = true;
private boolean isNeedDataEncry = false;
@@ -77,10 +77,10 @@ public class ProxyClientConfig {
private boolean cleanHttpCacheWhenClosing = false;
// config for metric collector
- // whether use bid as key for metric, default is true
- private boolean useBidAsKey = true;
- // whether use tid as key for metric, default is true
- private boolean useTidAsKey = true;
+ // whether use groupId as key for metric, default is true
+ private boolean useGroupIdAsKey = true;
+ // whether use StreamId as key for metric, default is true
+ private boolean useStreamIdAsKey = true;
// whether use localIp as key for metric, default is true
private boolean useLocalIpAsKey = true;
// metric collection interval, default is 1 mins in milliseconds.
@@ -88,12 +88,12 @@ public class ProxyClientConfig {
// max cache time for proxy config.
private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
- // metric bid
- private String metricBid = "inlong_sla_metric";
+ // metric groupId
+ private String metricGroupId = "inlong_sla_metric";
/*pay attention to the last url parameter ip*/
public ProxyClientConfig(String localHost, boolean isLocalVisit, String managerIp,
- int managerPort, String bid, String netTag) throws ProxysdkException {
+ int managerPort, String groupId, String netTag) throws ProxysdkException {
if (Utils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
@@ -101,7 +101,7 @@ public class ProxyClientConfig {
throw new IllegalArgumentException("managerIp is Blank!");
}
this.proxyIPServiceURL = "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
- this.bid = bid;
+ this.groupId = groupId;
this.netTag = netTag;
this.isLocalVisit = isLocalVisit;
this.managerPort = managerPort;
@@ -137,12 +137,12 @@ public class ProxyClientConfig {
isFile = file;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
public int getManagerPort() {
@@ -369,20 +369,20 @@ public class ProxyClientConfig {
this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
}
- public boolean isUseBidAsKey() {
- return useBidAsKey;
+ public boolean isUseGroupIdAsKey() {
+ return useGroupIdAsKey;
}
- public void setUseBidAsKey(boolean useBidAsKey) {
- this.useBidAsKey = useBidAsKey;
+ public void setUseGroupIdAsKey(boolean useGroupIdAsKey) {
+ this.useGroupIdAsKey = useGroupIdAsKey;
}
- public boolean isUseTidAsKey() {
- return useTidAsKey;
+ public boolean isUseStreamIdAsKey() {
+ return useStreamIdAsKey;
}
- public void setUseTidAsKey(boolean useTidAsKey) {
- this.useTidAsKey = useTidAsKey;
+ public void setUseStreamIdAsKey(boolean useStreamIdAsKey) {
+ this.useStreamIdAsKey = useStreamIdAsKey;
}
public boolean isUseLocalIpAsKey() {
@@ -401,12 +401,12 @@ public class ProxyClientConfig {
this.metricIntervalInMs = metricIntervalInMs;
}
- public String getMetricBid() {
- return metricBid;
+ public String getMetricGroupId() {
+ return metricGroupId;
}
- public void setMetricBid(String metricBid) {
- this.metricBid = metricBid;
+ public void setMetricGroupId(String metricGroupId) {
+ this.metricGroupId = metricGroupId;
}
public long getMaxProxyCacheTimeInMs() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
index e967965..bdd5e78 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/EncodeObject.java
@@ -39,15 +39,15 @@ public class EncodeObject {
private long packageTime = System.currentTimeMillis();
private int cnt = -1;
private boolean isReport = false;
- private boolean isBidTransfer = false;
+ private boolean isGroupIdTransfer = false;
private boolean isSupportLF = false;
private boolean isAuth = false;
private boolean isEncrypt = false;
private boolean isCompress = true;
- private int bidNum;
- private int tidNum;
- private String bid;
- private String tid;
+ private int groupIdNum;
+ private int streamIdNum;
+ private String groupId;
+ private String streamId;
private short load;
private String userName = "";
private String secretKey = "";
@@ -85,89 +85,92 @@ public class EncodeObject {
// used for bytes initializtion,msgtype=3/5
public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
- int msgtype, boolean isCompress, final String bid) {
+ int msgtype, boolean isCompress, final String groupId) {
this.bodyBytes = bodyBytes;
this.messageId = messageId;
this.attributes = attributes + "&messageId=" + messageId;
this.msgtype = msgtype;
- this.bid = bid;
+ this.groupId = groupId;
this.isCompress = isCompress;
}
// used for bodylist initializtion,msgtype=3/5
public EncodeObject(List<byte[]> bodyList, String attributes, String messageId,
- int msgtype, boolean isCompress, final String bid) {
+ int msgtype, boolean isCompress, final String groupId) {
this.bodylist = bodyList;
this.messageId = messageId;
this.attributes = attributes + "&messageId=" + messageId;
this.msgtype = msgtype;
- this.bid = bid;
+ this.groupId = groupId;
this.isCompress = isCompress;
}
// used for bytes initializtion,msgtype=7/8
public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport,
- boolean isBidTransfer, long dt, long seqId, String bid, String tid, String commonattr) {
+ boolean isGroupIdTransfer, long dt, long seqId, String groupId,
+ String streamId, String commonattr) {
this.bodyBytes = bodyBytes;
this.msgtype = msgtype;
this.isCompress = isCompress;
this.isReport = isReport;
this.dt = dt;
- this.isBidTransfer = isBidTransfer;
+ this.isGroupIdTransfer = isGroupIdTransfer;
this.commonattr = commonattr;
this.messageId = String.valueOf(seqId);
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
}
// used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isBidTransfer, long dt,
- long seqId, String bid, String tid, String commonattr) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr) {
this.bodylist = bodyList;
this.msgtype = msgtype;
this.isCompress = isCompress;
this.isReport = isReport;
this.dt = dt;
- this.isBidTransfer = isBidTransfer;
+ this.isGroupIdTransfer = isGroupIdTransfer;
this.commonattr = commonattr;
this.messageId = String.valueOf(seqId);
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
}
// file agent, used for bytes initializtion,msgtype=7/8
public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
- boolean isReport, boolean isBidTransfer, long dt,
- long seqId, String bid, String tid, String commonattr, String messageKey, String proxyIp) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr,
+ String messageKey, String proxyIp) {
this.bodyBytes = bodyBytes;
this.msgtype = msgtype;
this.isCompress = isCompress;
this.isReport = isReport;
this.dt = dt;
- this.isBidTransfer = isBidTransfer;
+ this.isGroupIdTransfer = isGroupIdTransfer;
this.commonattr = commonattr;
this.messageId = String.valueOf(seqId);
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.messageKey = messageKey;
this.proxyIp = proxyIp;
}
// file agent, used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isBidTransfer, long dt,
- long seqId, String bid, String tid, String commonattr, String messageKey, String proxyIp) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr,
+ String messageKey, String proxyIp) {
this.bodylist = bodyList;
this.msgtype = msgtype;
this.isCompress = isCompress;
this.isReport = isReport;
this.dt = dt;
- this.isBidTransfer = isBidTransfer;
+ this.isGroupIdTransfer = isGroupIdTransfer;
this.commonattr = commonattr;
this.messageId = String.valueOf(seqId);
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.messageKey = messageKey;
this.proxyIp = proxyIp;
}
@@ -180,12 +183,12 @@ public class EncodeObject {
this.msgUUID = msgUUID;
}
- public boolean isBidTransfer() {
- return isBidTransfer;
+ public boolean isGroupIdTransfer() {
+ return isGroupIdTransfer;
}
- public void setBidTransfer(boolean isBidTransfer) {
- this.isBidTransfer = isBidTransfer;
+ public void setGroupIdTransfer(boolean isGroupIdTransfer) {
+ this.isGroupIdTransfer = isGroupIdTransfer;
}
public short getLoad() {
@@ -196,20 +199,20 @@ public class EncodeObject {
this.load = load;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
- public void setTid(String tid) {
- this.tid = tid;
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
}
public void setMsgtype(int msgtype) {
@@ -262,20 +265,20 @@ public class EncodeObject {
this.encryptEntry = encryptEntry;
}
- public int getBidNum() {
- return bidNum;
+ public int getGroupIdNum() {
+ return groupIdNum;
}
- public void setBidNum(int bidNum) {
- this.bidNum = bidNum;
+ public void setGroupIdNum(int groupIdNum) {
+ this.groupIdNum = groupIdNum;
}
- public int getTidNum() {
- return tidNum;
+ public int getStreamIdNum() {
+ return streamIdNum;
}
- public void setTidNum(int tidNum) {
- this.tidNum = tidNum;
+ public void setStreamIdNum(int streamIdNum) {
+ this.streamIdNum = streamIdNum;
}
public long getDt() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
index 7edd389..456156b 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/codec/ProtocolEncoder.java
@@ -136,11 +136,11 @@ public class ProtocolEncoder extends OneToOneEncoder {
body = EncryptUtil.desEncrypt(body, encryptInfo.getDesKey());
}
}
- if (!object.isBidTransfer()) {
+ if (!object.isGroupIdTransfer()) {
if (Utils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
- endAttr = (endAttr + "bid=" + object.getBid() + "&tid=" + object.getTid());
+ endAttr = (endAttr + "groupId=" + object.getGroupId() + "&streamId=" + object.getStreamId());
}
if (Utils.isNotBlank(object.getMsgUUID())) {
if (Utils.isNotBlank(endAttr)) {
@@ -159,12 +159,12 @@ public class ProtocolEncoder extends OneToOneEncoder {
totalLength = totalLength + body.length + endAttr.getBytes("utf8").length;
buf.writeInt(totalLength);
buf.writeByte(msgType);
- buf.writeShort(object.getBidNum());
- buf.writeShort(object.getTidNum());
+ buf.writeShort(object.getGroupIdNum());
+ buf.writeShort(object.getStreamIdNum());
String bitStr = object.isSupportLF() ? "1" : "0";
bitStr += (object.getMessageKey().equals("minute")) ? "1" : "0";
bitStr += (object.getMessageKey().equals("file")) ? "1" : "0";
- bitStr += !object.isBidTransfer() ? "1" : "0";
+ bitStr += !object.isGroupIdTransfer() ? "1" : "0";
bitStr += object.isReport() ? "1" : "0";
bitStr += "0";
buf.writeShort(Integer.parseInt(bitStr, 2));
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
index 82b68a1..9f5b8d4 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/EncryptConfigEntry.java
@@ -138,7 +138,7 @@ public class EncryptConfigEntry implements java.io.Serializable {
}
public String toString() {
- return "{\"version\":\"" + version + "\",\"public_key\":\"" + pubKey + "\",\"bid\":\"" + userName + "\"}";
+ return "{\"version\":\"" + version + "\",\"public_key\":\"" + pubKey + "\",\"groupId\":\"" + userName + "\"}";
}
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
index 2a994fd..846b057 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigEntry.java
@@ -22,11 +22,11 @@ import java.util.Map;
public class ProxyConfigEntry implements java.io.Serializable {
private String clusterId;
- private String bid;
+ private String groupId;
private int size;
private Map<String, HostInfo> hostMap;
- private int bidNum;
- private Map<String, Integer> tidNumMap;
+ private int groupIdNum;
+ private Map<String, Integer> streamIdNumMap;
private int load;
private int switchStat;
private boolean isInterVisit;
@@ -39,17 +39,17 @@ public class ProxyConfigEntry implements java.io.Serializable {
this.load = load;
}
- public int getBidNum() {
- return bidNum;
+ public int getGroupIdNum() {
+ return groupIdNum;
}
- public Map<String, Integer> getTidNumMap() {
- return tidNumMap;
+ public Map<String, Integer> getStreamIdNumMap() {
+ return streamIdNumMap;
}
- public void setBidNumAndTidNumMap(int bidNum, Map<String, Integer> tidNumMap) {
- this.bidNum = bidNum;
- this.tidNumMap = tidNumMap;
+ public void setGroupIdNumAndStreamIdNumMap(int groupIdNum, Map<String, Integer> streamIdNumMap) {
+ this.groupIdNum = groupIdNum;
+ this.streamIdNumMap = streamIdNumMap;
}
public int getSwitchStat() {
@@ -73,12 +73,12 @@ public class ProxyConfigEntry implements java.io.Serializable {
return size;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
public boolean isInterVisit() {
@@ -91,8 +91,9 @@ public class ProxyConfigEntry implements java.io.Serializable {
@Override
public String toString() {
- return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn=" + bidNum + ", tsnMap=" + tidNumMap
- + ", size=" + size + ", isInterVisit=" + isInterVisit + ", bid=" + bid
+ return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn="
+ + groupIdNum + ", tsnMap=" + streamIdNumMap
+ + ", size=" + size + ", isInterVisit=" + isInterVisit + ", groupId=" + groupId
+ ", switch=" + switchStat + "]";
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
index e61b2e2..f846117 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
@@ -82,7 +82,7 @@ public class ProxyConfigManager extends Thread {
private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
/*the status of the cluster.if this value is changed,we need rechoose three proxy*/
private int oldStat = 0;
- private String businessId;
+ private String groupId;
private final ProxyClientConfig clientConfig;
private final String localIP;
private String localMd5;
@@ -101,12 +101,12 @@ public class ProxyConfigManager extends Thread {
this.clientManager = clientManager;
}
- public String getBusinessId() {
- return businessId;
+ public String getGroupId() {
+ return groupId;
}
- public void setBusinessId(String businessId) {
- this.businessId = businessId;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
public void shutDown() {
@@ -158,7 +158,7 @@ public class ProxyConfigManager extends Thread {
if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
JsonReader reader = new JsonReader(new FileReader(configCachePath));
ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class);
- logger.info("{} has a backup! {}", businessId, proxyConfigEntry);
+ logger.info("{} has a backup! {}", groupId, proxyConfigEntry);
return proxyConfigEntry;
}
} catch (Exception ex) {
@@ -199,14 +199,14 @@ public class ProxyConfigManager extends Thread {
}
/**
- * get bid config
+ * get groupId config
*
* @return proxyConfigEntry
* @throws Exception
*/
- public ProxyConfigEntry getBidConfigure() throws Exception {
+ public ProxyConfigEntry getGroupIdConfigure() throws Exception {
ProxyConfigEntry proxyEntry;
- String configAddr = clientConfig.getConfStoreBasePath() + businessId;
+ String configAddr = clientConfig.getConfStoreBasePath() + groupId;
if (this.clientConfig.isReadProxyIPFromLocal()) {
configAddr = configAddr + ".local";
proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -242,7 +242,7 @@ public class ProxyConfigManager extends Thread {
localMd5 = calcHostInfoMd5(proxyInfoList);
}
ProxyConfigEntry proxyEntry = null;
- String configAddr = clientConfig.getConfStoreBasePath() + businessId;
+ String configAddr = clientConfig.getConfStoreBasePath() + groupId;
if (clientConfig.isReadProxyIPFromLocal()) {
configAddr = configAddr + ".local";
proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -298,8 +298,8 @@ public class ProxyConfigManager extends Thread {
if (proxyEntry.getSize() != 0) {
/* Initialize the current proxy information list first. */
clientManager.setLoadThreshold(proxyEntry.getLoad());
- clientManager.setBidNum(proxyEntry.getBidNum());
- clientManager.setTidMap(proxyEntry.getTidNumMap());
+ clientManager.setGroupIdNum(proxyEntry.getGroupIdNum());
+ clientManager.setStreamIdMap(proxyEntry.getStreamIdNumMap());
List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) {
@@ -559,9 +559,9 @@ public class ProxyConfigManager extends Thread {
throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
}
- int bidNum = 0;
+ int groupIdNum = 0;
if (localProxyAddrJson.has("bsn")) {
- bidNum = localProxyAddrJson.get("bsn").getAsInt();
+ groupIdNum = localProxyAddrJson.get("bsn").getAsInt();
}
int load = ConfigConstants.LOAD_THRESHOLD;
@@ -570,15 +570,15 @@ public class ProxyConfigManager extends Thread {
load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
}
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
- proxyEntry.setBid(clientConfig.getBid());
+ proxyEntry.setGroupId(clientConfig.getGroupId());
boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
proxyEntry.setInterVisit(isInterVisit);
Map<String, HostInfo> hostMap = getHostInfoMap(
localProxyAddrJson);
proxyEntry.setHostMap(hostMap);
proxyEntry.setSwitchStat(0);
- Map<String, Integer> tidMap = getTidMap(localProxyAddrJson);
- proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
+ Map<String, Integer> streamIdMap = getStreamIdMap(localProxyAddrJson);
+ proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
proxyEntry.setLoad(load);
if (localProxyAddrJson.has("cluster_id")) {
proxyEntry.setClusterId(localProxyAddrJson.get("cluster_id").getAsString());
@@ -624,18 +624,18 @@ public class ProxyConfigManager extends Thread {
return hostMap;
}
- private Map<String, Integer> getTidMap(JsonObject localProxyAddrJson) {
- Map<String, Integer> tidMap = new HashMap<String, Integer>();
+ private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) {
+ Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
if (localProxyAddrJson.has("tsn")) {
- JsonArray jsontid = localProxyAddrJson.getAsJsonArray("tsn");
- for (int i = 0; i < jsontid.size(); i++) {
- JsonObject jsonItem = jsontid.get(i).getAsJsonObject();
- if (jsonItem != null && jsonItem.has("tid") && jsonItem.has("sn")) {
- tidMap.put(jsonItem.get("tid").getAsString(), jsonItem.get("sn").getAsInt());
+ JsonArray jsonStreamId = localProxyAddrJson.getAsJsonArray("tsn");
+ for (int i = 0; i < jsonStreamId.size(); i++) {
+ JsonObject jsonItem = jsonStreamId.get(i).getAsJsonObject();
+ if (jsonItem != null && jsonItem.has("streamId") && jsonItem.has("sn")) {
+ streamIdMap.put(jsonItem.get("streamId").getAsString(), jsonItem.get("sn").getAsInt());
}
}
}
- return tidMap;
+ return streamIdMap;
}
private boolean checkValidProxy(String filePath, JsonObject localProxyAddrJson) throws Exception {
@@ -673,9 +673,9 @@ public class ProxyConfigManager extends Thread {
if (hostMap == null) {
return null;
}
- int bidNum = 0;
+ int groupIdNum = 0;
if (jsonRes.has("bsn")) {
- bidNum = jsonRes.get("bsn").getAsInt();
+ groupIdNum = jsonRes.get("bsn").getAsInt();
}
int load = ConfigConstants.LOAD_THRESHOLD;
if (jsonRes.has("load")) {
@@ -683,12 +683,12 @@ public class ProxyConfigManager extends Thread {
load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
}
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
- proxyEntry.setBid(clientConfig.getBid());
+ proxyEntry.setGroupId(clientConfig.getGroupId());
proxyEntry.setInterVisit(true);
proxyEntry.setHostMap(hostMap);
proxyEntry.setSwitchStat(0);
- Map<String, Integer> tidMap = getTidMap(jsonRes);
- proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
+ Map<String, Integer> streamIdMap = getStreamIdMap(jsonRes);
+ proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
proxyEntry.setLoad(load);
if (jsonRes.has("cluster_id")) {
proxyEntry.setClusterId(jsonRes.get("cluster_id").getAsString());
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
index b1c22b5..4d6ceeb 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/demo/Event.java
@@ -22,26 +22,26 @@ import java.util.ArrayList;
public class Event {
private byte[] body;
- private String bid;
- private String tid;
+ private String groupId;
+ private String streamId;
private long dt;
private int tryTimes = 0;
ArrayList<byte[]> bodylist = new ArrayList<byte[]>();
- public Event(byte[] body, String bid, String tid, long dt) {
+ public Event(byte[] body, String groupId, String streamId, long dt) {
super();
this.body = body;
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.dt = dt;
this.setTryTimes(0);
}
- public Event(ArrayList<byte[]> bodylist, String bid, String tid, long dt) {
+ public Event(ArrayList<byte[]> bodylist, String groupId, String streamId, long dt) {
super();
this.bodylist = bodylist;
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.dt = dt;
this.setTryTimes(0);
}
@@ -62,20 +62,20 @@ public class Event {
this.body = body;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
- public void setTid(String tid) {
- this.tid = tid;
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
}
public long getDt() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
index 659a70b..e0b7968 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/http/InternalHttpSender.java
@@ -87,16 +87,16 @@ public class InternalHttpSender {
* construct header
*
* @param bodies
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @return
*/
private ArrayList<BasicNameValuePair> getHeaders(List<String> bodies,
- String bid, String tid, long dt) {
+ String groupId, String streamId, long dt) {
ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
- params.add(new BasicNameValuePair("bid", bid));
- params.add(new BasicNameValuePair("tid", tid));
+ params.add(new BasicNameValuePair("groupId", groupId));
+ params.add(new BasicNameValuePair("streamId", streamId));
params.add(new BasicNameValuePair("dt", String.valueOf(dt)));
params.add(new BasicNameValuePair("body", StringUtils.join(bodies, "\n")));
params.add(new BasicNameValuePair("cnt", String.valueOf(bodies.size())));
@@ -137,8 +137,8 @@ public class InternalHttpSender {
HttpMessage httpMessage = messageCache.poll();
if (httpMessage != null) {
SendResult result = sendMessageWithHostInfo(
- httpMessage.getBodies(), httpMessage.getBid(),
- httpMessage.getTid(), httpMessage.getDt(),
+ httpMessage.getBodies(), httpMessage.getGroupId(),
+ httpMessage.getStreamId(), httpMessage.getDt(),
httpMessage.getTimeout(), httpMessage.getTimeUnit());
httpMessage.getCallback().onMessageAck(result);
}
@@ -168,8 +168,8 @@ public class InternalHttpSender {
* send request by http
*
* @param bodies
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
@@ -177,7 +177,7 @@ public class InternalHttpSender {
* @return
* @throws Exception
*/
- private SendResult sendByHttp(List<String> bodies, String bid, String tid, long dt,
+ private SendResult sendByHttp(List<String> bodies, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit, HostInfo hostInfo) throws Exception {
HttpPost httpPost = null;
CloseableHttpResponse response = null;
@@ -191,7 +191,7 @@ public class InternalHttpSender {
httpPost = new HttpPost(url);
httpPost.setHeader(HttpHeaders.CONNECTION, "close");
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- ArrayList<BasicNameValuePair> contents = getHeaders(bodies, bid, tid, dt);
+ ArrayList<BasicNameValuePair> contents = getHeaders(bodies, groupId, streamId, dt);
String s = URLEncodedUtils.format(contents, StandardCharsets.UTF_8);
logger.info("encode string is {}", s);
httpPost.setEntity(new StringEntity(s));
@@ -233,21 +233,21 @@ public class InternalHttpSender {
* send message with host info
*
* @param bodies
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
* @return
*/
- public SendResult sendMessageWithHostInfo(List<String> bodies, String bid, String tid, long dt,
+ public SendResult sendMessageWithHostInfo(List<String> bodies, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit) {
List<HostInfo> randomHostList = getRandomHostInfo();
Exception tmpException = null;
for (HostInfo hostInfo : randomHostList) {
try {
- return sendByHttp(bodies, bid, tid, dt, timeout, timeUnit, hostInfo);
+ return sendByHttp(bodies, groupId, streamId, dt, timeout, timeUnit, hostInfo);
} catch (Exception exception) {
tmpException = exception;
logger.debug("error while sending data, resending it", exception);
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
index 8ac9153..fed67ed 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/metric/MessageRecord.java
@@ -27,14 +27,15 @@ public class MessageRecord {
private final long startTime;
private final long dt;
- private final String bid;
- private final String tid;
+ private final String groupId;
+ private final String streamId;
private final String localIp;
private final long packTime;
- public MessageRecord(String bid, String tid, String localIp, String msgId, long dt, long packTime, int msgCount) {
- this.bid = bid;
- this.tid = tid;
+ public MessageRecord(String groupId, String streamId, String localIp,
+ String msgId, long dt, long packTime, int msgCount) {
+ this.groupId = groupId;
+ this.streamId = streamId;
this.localIp = localIp;
this.msgUUID = msgId;
this.msgCount = msgCount;
@@ -63,12 +64,12 @@ public class MessageRecord {
return dt;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
public String getLocalIp() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
index 1b87169..aa32d0d 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/ClientMgr.java
@@ -75,9 +75,9 @@ public class ClientMgr {
private SendHBThread sendHBThread;
private ProxyConfigManager ipManager;
- private int bidNum = 0;
- private String bid = "";
- private Map<String, Integer> tidMap = new HashMap<String, Integer>();
+ private int groupIdNum = 0;
+ private String groupId = "";
+ private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
private int loadThreshold;
private int loadCycle = 0;
private static final int[] weight = {
@@ -101,28 +101,28 @@ public class ClientMgr {
this.loadThreshold = loadThreshold;
}
- public int getBidNum() {
- return bidNum;
+ public int getGroupIdNum() {
+ return groupIdNum;
}
- public void setBidNum(int bidNum) {
- this.bidNum = bidNum;
+ public void setGroupIdNum(int groupIdNum) {
+ this.groupIdNum = groupIdNum;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
- public Map<String, Integer> getTidMap() {
- return tidMap;
+ public Map<String, Integer> getStreamIdMap() {
+ return streamIdMap;
}
- public void setTidMap(Map<String, Integer> tidMap) {
- this.tidMap = tidMap;
+ public void setStreamIdMap(Map<String, Integer> streamIdMap) {
+ this.streamIdMap = streamIdMap;
}
public EncryptConfigEntry getEncryptConfigEntry() {
@@ -195,9 +195,9 @@ public class ClientMgr {
/* ready to Start the thread which refreshes the proxy list. */
ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(), this);
ipManager.setName("proxyConfigManager");
- if (configure.getBid() != null) {
- ipManager.setBusinessId(configure.getBid());
- bid = configure.getBid();
+ if (configure.getGroupId() != null) {
+ ipManager.setGroupId(configure.getGroupId());
+ groupId = configure.getGroupId();
}
/*
@@ -222,8 +222,8 @@ public class ClientMgr {
this.sendHBThread.start();
}
- public ProxyConfigEntry getBidConfigureInfo() throws Exception {
- return ipManager.getBidConfigure();
+ public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
+ return ipManager.getGroupIdConfigure();
}
private boolean initConnection(HostInfo host) {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
index 712b113..4402f79 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpMessage.java
@@ -27,18 +27,18 @@ import org.apache.inlong.dataproxy.SendMessageCallback;
* http message for cache.
*/
public class HttpMessage {
- private final String bid;
- private final String tid;
+ private final String groupId;
+ private final String streamId;
private final List<String> bodies;
private final SendMessageCallback callback;
private final long dt;
private final long timeout;
private final TimeUnit timeUnit;
- public HttpMessage(List<String> bodies, String bid, String tid, long dt,
+ public HttpMessage(List<String> bodies, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
- this.bid = bid;
- this.tid = tid;
+ this.groupId = groupId;
+ this.streamId = streamId;
this.bodies = bodies;
this.callback = callback;
this.dt = dt;
@@ -46,12 +46,12 @@ public class HttpMessage {
this.timeUnit = timeUnit;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
public List<String> getBodies() {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
index 37ffd82..ce78e41 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/HttpProxySender.java
@@ -70,7 +70,7 @@ public class HttpProxySender extends Thread {
try {
proxyConfigManager = new ProxyConfigManager(configure,
Utils.getLocalIp(), null);
- proxyConfigManager.setBusinessId(configure.getBid());
+ proxyConfigManager.setGroupId(configure.getGroupId());
ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
hostList.addAll(proxyConfigEntry.getHostMap().values());
@@ -92,7 +92,7 @@ public class HttpProxySender extends Thread {
* @return proxy config entry.
*/
private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
- return proxyConfigManager.getBidConfigure();
+ return proxyConfigManager.getGroupIdConfigure();
}
/**
@@ -106,7 +106,7 @@ public class HttpProxySender extends Thread {
int randSleepTime = proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60 + rand;
TimeUnit.MILLISECONDS.sleep(randSleepTime * 1000);
if (proxyConfigManager != null) {
- ProxyConfigEntry proxyConfigEntry = proxyConfigManager.getBidConfigure();
+ ProxyConfigEntry proxyConfigEntry = proxyConfigManager.getGroupIdConfigure();
hostList.addAll(proxyConfigEntry.getHostMap().values());
hostList.retainAll(proxyConfigEntry.getHostMap().values());
} else {
@@ -125,38 +125,38 @@ public class HttpProxySender extends Thread {
* send by http
*
* @param body
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
* @return
*/
- public SendResult sendMessage(String body, String bid, String tid, long dt,
+ public SendResult sendMessage(String body, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit) {
- return sendMessage(Collections.singletonList(body), bid, tid, dt, timeout, timeUnit);
+ return sendMessage(Collections.singletonList(body), groupId, streamId, dt, timeout, timeUnit);
}
/**
* send multiple messages.
*
* @param bodies list of bodies
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
* @return
*/
- public SendResult sendMessage(List<String> bodies, String bid, String tid, long dt,
+ public SendResult sendMessage(List<String> bodies, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit) {
if (hostList.isEmpty()) {
logger.error("proxy list is empty, maybe client has been "
- + "closed or bid is not assigned with proxy list");
+ + "closed or groupId is not assigned with proxy list");
return SendResult.NO_CONNECTION;
}
return internalHttpSender.sendMessageWithHostInfo(
- bodies, bid, tid, dt, timeout, timeUnit);
+ bodies, groupId, streamId, dt, timeout, timeUnit);
}
@@ -164,17 +164,17 @@ public class HttpProxySender extends Thread {
* async sender
*
* @param bodies
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
* @param callback
*/
- public void asyncSendMessage(List<String> bodies, String bid, String tid, long dt,
+ public void asyncSendMessage(List<String> bodies, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
List<String> bodyList = new ArrayList<>(bodies);
- HttpMessage httpMessage = new HttpMessage(bodyList, bid, tid, dt,
+ HttpMessage httpMessage = new HttpMessage(bodyList, groupId, streamId, dt,
timeout, timeUnit, callback);
try {
if (!messageCache.offer(httpMessage)) {
@@ -200,16 +200,16 @@ public class HttpProxySender extends Thread {
* async send single message.
*
* @param body
- * @param bid
- * @param tid
+ * @param groupId
+ * @param streamId
* @param dt
* @param timeout
* @param timeUnit
* @param callback
*/
- public void asyncSendMessage(String body, String bid, String tid, long dt,
+ public void asyncSendMessage(String body, String groupId, String streamId, long dt,
long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
- asyncSendMessage(Collections.singletonList(body), bid, tid,
+ asyncSendMessage(Collections.singletonList(body), groupId, streamId,
dt, timeout, timeUnit, callback);
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
index f29630b..d4938df 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/network/Sender.java
@@ -77,7 +77,7 @@ public class Sender {
this.clientMgr = new ClientMgr(configure, this, selfDefineFactory);
ProxyConfigEntry proxyConfigEntry = null;
try {
- proxyConfigEntry = this.clientMgr.getBidConfigureInfo();
+ proxyConfigEntry = this.clientMgr.getGroupIdConfigureInfo();
setClusterId(proxyConfigEntry.getClusterId());
} catch (Throwable e) {
if (configure.isReadProxyIPFromLocal()) {
@@ -186,17 +186,17 @@ public class Sender {
return SendResult.INVALID_ATTRIBUTES;
}
if (encodeObject.getMsgtype() == 7) {
- int bidnum = 0;
- int tidnum = 0;
- if (encodeObject.getBid().equals(clientMgr.getBid())) {
- bidnum = clientMgr.getBidNum();
- tidnum = clientMgr.getTidMap().get(encodeObject.getTid()) != null
- ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+ int groupIdnum = 0;
+ int streamIdnum = 0;
+ if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
+ groupIdnum = clientMgr.getGroupIdNum();
+ streamIdnum = clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
+ ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
}
- encodeObject.setBidNum(bidnum);
- encodeObject.setTidNum(tidnum);
- if (bidnum == 0 || tidnum == 0) {
- encodeObject.setBidTransfer(false);
+ encodeObject.setGroupIdNum(groupIdnum);
+ encodeObject.setStreamIdNum(streamIdnum);
+ if (groupIdnum == 0 || streamIdnum == 0) {
+ encodeObject.setGroupIdTransfer(false);
}
}
if (this.configure.isNeedDataEncry()) {
@@ -216,7 +216,7 @@ public class Sender {
public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID,
long timeout, TimeUnit timeUnit) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
- encodeObject.getBid(), encodeObject.getTid(),
+ encodeObject.getGroupId(), encodeObject.getStreamId(),
Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt());
NettyClient client = null;
SendResult message = null;
@@ -277,17 +277,17 @@ public class Sender {
}
if (encodeObject.getMsgtype() == 7) {
- int bidnum = 0;
- int tidnum = 0;
- if (encodeObject.getBid().equals(clientMgr.getBid())) {
- bidnum = clientMgr.getBidNum();
- tidnum = clientMgr.getTidMap().get(encodeObject.getTid()) != null
- ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+ int groupIdnum = 0;
+ int streamIdnum = 0;
+ if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
+ groupIdnum = clientMgr.getGroupIdNum();
+ streamIdnum = clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
+ ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
}
- encodeObject.setBidNum(bidnum);
- encodeObject.setTidNum(tidnum);
- if (bidnum == 0 || tidnum == 0) {
- encodeObject.setBidTransfer(false);
+ encodeObject.setGroupIdNum(groupIdnum);
+ encodeObject.setStreamIdNum(streamIdnum);
+ if (groupIdnum == 0 || streamIdnum == 0) {
+ encodeObject.setGroupIdTransfer(false);
}
}
if (this.configure.isNeedDataEncry()) {
@@ -425,17 +425,17 @@ public class Sender {
msgQueueMap.put(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(),
callback, size, timeout, timeUnit));
if (encodeObject.getMsgtype() == 7) {
- int bidnum = 0;
- int tidnum = 0;
- if ((clientMgr.getBid().length() != 0) && (encodeObject.getBid().equals(clientMgr.getBid()))) {
- bidnum = clientMgr.getBidNum();
- tidnum = (clientMgr.getTidMap().get(encodeObject.getTid()) != null)
- ? clientMgr.getTidMap().get(encodeObject.getTid()) : 0;
+ int groupIdnum = 0;
+ int streamIdnum = 0;
+ if ((clientMgr.getGroupId().length() != 0) && (encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
+ groupIdnum = clientMgr.getGroupIdNum();
+ streamIdnum = (clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null)
+ ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) : 0;
}
- encodeObject.setBidNum(bidnum);
- encodeObject.setTidNum(tidnum);
- if (bidnum == 0 || tidnum == 0) {
- encodeObject.setBidTransfer(false);
+ encodeObject.setGroupIdNum(groupIdnum);
+ encodeObject.setStreamIdNum(streamIdnum);
+ if (groupIdnum == 0 || streamIdnum == 0) {
+ encodeObject.setGroupIdTransfer(false);
}
}
if (this.configure.isNeedDataEncry()) {
@@ -495,8 +495,8 @@ public class Sender {
public void asyncSendMessage(EncodeObject encodeObject,
SendMessageCallback callback, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
- metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getBid(),
- encodeObject.getTid(), Utils.getLocalIp(), encodeObject.getPackageTime(),
+ metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
+ encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
// send message package time
@@ -546,17 +546,17 @@ public class Sender {
logger.warn("message id {} has existed.", encodeObject.getMessageId());
}
if (encodeObject.getMsgtype() == 7) {
- int bidnum = 0;
- int tidnum = 0;
- if ((clientMgr.getBid().length() != 0) && (encodeObject.getBid().equals(clientMgr.getBid()))) {
- bidnum = clientMgr.getBidNum();
- tidnum = (clientMgr.getTidMap().get(encodeObject.getTid()) != null) ? clientMgr
- .getTidMap().get(encodeObject.getTid()) : 0;
+ int groupIdnum = 0;
+ int streamIdnum = 0;
+ if ((clientMgr.getGroupId().length() != 0) && (encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
+ groupIdnum = clientMgr.getGroupIdNum();
+ streamIdnum = (clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null) ? clientMgr
+ .getStreamIdMap().get(encodeObject.getStreamId()) : 0;
}
- encodeObject.setBidNum(bidnum);
- encodeObject.setTidNum(tidnum);
- if (bidnum == 0 || tidnum == 0) {
- encodeObject.setBidTransfer(false);
+ encodeObject.setGroupIdNum(groupIdnum);
+ encodeObject.setStreamIdNum(streamIdnum);
+ if (groupIdnum == 0 || streamIdnum == 0) {
+ encodeObject.setGroupIdTransfer(false);
}
}
if (this.configure.isNeedDataEncry()) {
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
index 47b5b60..1ecd39a 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/threads/MetricWorkerThread.java
@@ -75,19 +75,19 @@ public class MetricWorkerThread extends Thread implements Closeable {
/**
* get string key
*
- * @param bid - bid
- * @param tid - tid
+ * @param groupId - groupId
+ * @param streamId - streamId
* @param localIp - ip
* @return
*/
- private String getKeyStringByConfig(String bid, String tid, String localIp, long keyTime) {
+ private String getKeyStringByConfig(String groupId, String streamId, String localIp, long keyTime) {
StringBuilder builder = new StringBuilder();
- String bidStr = proxyClientConfig.isUseBidAsKey() ? bid : DEFAULT_KEY_ITEM;
- String tidStr = proxyClientConfig.isUseTidAsKey() ? tid : DEFAULT_KEY_ITEM;
+ String groupIdStr = proxyClientConfig.isUseGroupIdAsKey() ? groupId : DEFAULT_KEY_ITEM;
+ String streamIdStr = proxyClientConfig.isUseStreamIdAsKey() ? streamId : DEFAULT_KEY_ITEM;
String localIpStr = proxyClientConfig.isUseLocalIpAsKey() ? localIp : DEFAULT_KEY_ITEM;
- builder.append(bidStr).append(DEFAULT_KEY_SPLITTER)
- .append(tidStr).append(DEFAULT_KEY_SPLITTER)
+ builder.append(groupIdStr).append(DEFAULT_KEY_SPLITTER)
+ .append(streamIdStr).append(DEFAULT_KEY_SPLITTER)
.append(localIpStr).append(DEFAULT_KEY_SPLITTER)
.append(keyTime);
return builder.toString();
@@ -97,18 +97,19 @@ public class MetricWorkerThread extends Thread implements Closeable {
* record num
*
* @param msgId - msg uuid
- * @param bid - bid
- * @param tid - tid
+ * @param groupId - groupId
+ * @param streamId - streamId
* @param localIp - ip
* @param packTime - package time
* @param dt - dt
* @param num - num
*/
- public void recordNumByKey(String msgId, String bid, String tid, String localIp, long packTime, long dt, int num) {
+ public void recordNumByKey(String msgId, String groupId, String streamId,
+ String localIp, long packTime, long dt, int num) {
if (!enableSlaMetric) {
return;
}
- MessageRecord messageRecord = new MessageRecord(bid, tid, localIp, msgId,
+ MessageRecord messageRecord = new MessageRecord(groupId, streamId, localIp, msgId,
getFormatKeyTime(dt), getFormatKeyTime(packTime), num);
metricValueCache.putIfAbsent(msgId, messageRecord);
@@ -135,9 +136,9 @@ public class MetricWorkerThread extends Thread implements Closeable {
MessageRecord messageRecord = metricValueCache.remove(msgId);
if (messageRecord != null) {
- String packTimeKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+ String packTimeKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
messageRecord.getLocalIp(), messageRecord.getPackTime());
- String dtKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+ String dtKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
messageRecord.getLocalIp(), messageRecord.getDt());
MetricTimeNumSummary packTimeSummary = getMetricSummary(packTimeKeyName,
@@ -160,9 +161,9 @@ public class MetricWorkerThread extends Thread implements Closeable {
MessageRecord messageRecord = metricValueCache.remove(msgId);
if (messageRecord != null) {
- String packTimeKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+ String packTimeKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
messageRecord.getLocalIp(), messageRecord.getPackTime());
- String dtKeyName = getKeyStringByConfig(messageRecord.getBid(), messageRecord.getTid(),
+ String dtKeyName = getKeyStringByConfig(messageRecord.getGroupId(), messageRecord.getStreamId(),
messageRecord.getLocalIp(), messageRecord.getDt());
MetricTimeNumSummary packTimeSummary = getMetricSummary(packTimeKeyName,
@@ -213,11 +214,11 @@ public class MetricWorkerThread extends Thread implements Closeable {
}
}
- private void sendSingleLine(String line, String tid, long dtTime) {
+ private void sendSingleLine(String line, String streamId, long dtTime) {
EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
- proxyClientConfig.getMetricBid(), tid, "", "", Utils.getLocalIp());
+ proxyClientConfig.getMetricGroupId(), streamId, "", "", Utils.getLocalIp());
MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
tryToSendMetricToManager(encodeObject, callBack);
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
index a786fe6..20be209 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ProxyUtils.java
@@ -35,7 +35,7 @@ public class ProxyUtils {
private static final Set<String> invalidAttr = new HashSet<>();
static {
- Collections.addAll(invalidAttr, "bid", "tid", "dt", "msgUUID", "cp",
+ Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
"cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
"_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyDesKey");
}
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
index ebe88f5..05d05a7 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/utils/ServiceDiscoveryUtils.java
@@ -224,7 +224,7 @@ public class ServiceDiscoveryUtils {
try {
File managerIpListFile = new File(managerIpLocalPath);
if (!managerIpListFile.exists()) {
- log.info("ServiceDiscovery no found local bidInfo file, "
+ log.info("ServiceDiscovery no found local groupIdInfo file, "
+ "doesn't matter, path is [" + managerIpLocalPath + "].");
return null;
}
diff --git a/inlong-dataproxy/bin/prepare_env.sh b/inlong-dataproxy/bin/prepare_env.sh
index 5cc59f4..7883b4e 100755
--- a/inlong-dataproxy/bin/prepare_env.sh
+++ b/inlong-dataproxy/bin/prepare_env.sh
@@ -21,7 +21,7 @@
cd "$(dirname "$0")"/../conf
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,bid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
do
touch $i
done
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
index 0a0b24c..1be867e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/ProxyMessage.java
@@ -23,28 +23,28 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
public class ProxyMessage {
- private String bid;
+ private String groupId;
private String topic;
- private String tid;
+ private String streamId;
private Map<String, String> attributeMap;
private byte[] data;
- public ProxyMessage(String bid, String tid, Map<String, String> attributeMap, byte[] data) {
- this.bid = bid;
- this.tid = tid;
+ public ProxyMessage(String groupId, String streamId, Map<String, String> attributeMap, byte[] data) {
+ this.groupId = groupId;
+ this.streamId = streamId;
this.attributeMap = attributeMap;
this.data = data;
}
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
- public void setBid(String bid) {
- this.bid = bid;
- this.attributeMap.put(AttributeConstants.BUSINESS_ID, bid);
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ this.attributeMap.put(AttributeConstants.GROUP_ID, groupId);
}
public String getTopic() {
@@ -55,13 +55,13 @@ public class ProxyMessage {
this.topic = topic;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
- public void setTid(String tid) {
- this.tid = tid;
- this.attributeMap.put(AttributeConstants.INTERFACE_ID, tid);
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ this.attributeMap.put(AttributeConstants.INTERFACE_ID, streamId);
}
public Map<String, String> getAttributeMap() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 8fbbd33..1c3caa5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -34,7 +34,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.dataproxy.config.RemoteConfigJson.DataItem;
-import org.apache.inlong.dataproxy.config.holder.BidPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.FileConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
@@ -57,8 +57,8 @@ public class ConfigManager {
private final PropertiesConfigHolder topicConfig =
new PropertiesConfigHolder("topics.properties");
private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
- private final BidPropertiesHolder bidConfig =
- new BidPropertiesHolder("bid_mapping.properties");
+ private final GroupIdPropertiesHolder groupIdConfig =
+ new GroupIdPropertiesHolder("groupid_mapping.properties");
private final PropertiesConfigHolder dcConfig =
new PropertiesConfigHolder("dc_mapping.properties");
private final PropertiesConfigHolder transferConfig =
@@ -168,16 +168,16 @@ public class ConfigManager {
return mxConfig.getMxPropertiesMaps();
}
- public Map<String, String> getBidMappingProperties() {
- return bidConfig.getBidMappingProperties();
+ public Map<String, String> getGroupIdMappingProperties() {
+ return groupIdConfig.getGroupIdMappingProperties();
}
- public Map<String, Map<String, String>> getTidMappingProperties() {
- return bidConfig.getTidMappingProperties();
+ public Map<String, Map<String, String>> getStreamIdMappingProperties() {
+ return groupIdConfig.getStreamIdMappingProperties();
}
- public Map<String, String> getBidEnableMappingProperties() {
- return bidConfig.getBidEnableMappingProperties();
+ public Map<String, String> getGroupIdEnableMappingProperties() {
+ return groupIdConfig.getGroupIdEnableMappingProperties();
}
public Map<String, String> getCommonProperties() {
@@ -258,19 +258,19 @@ public class ConfigManager {
// request with post
CloseableHttpResponse response = httpClient.execute(httpGet);
String returnStr = EntityUtils.toString(response.getEntity());
- // get bid <-> topic and m value.
+ // get groupId <-> topic and m value.
RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
- Map<String, String> bidToTopic = new HashMap<String, String>();
- Map<String, String> bidToMValue = new HashMap<String, String>();
+ Map<String, String> groupIdToTopic = new HashMap<String, String>();
+ Map<String, String> groupIdToMValue = new HashMap<String, String>();
if (configJson.getErrCode() == 0) {
for (DataItem item : configJson.getData()) {
- bidToMValue.put(item.getBid(), item.getM());
- bidToTopic.put(item.getBid(), item.getTopic());
+ groupIdToMValue.put(item.getGroupId(), item.getM());
+ groupIdToTopic.put(item.getGroupId(), item.getTopic());
}
- configManager.addMxProperties(bidToMValue);
- configManager.addTopicProperties(bidToTopic);
+ configManager.addMxProperties(groupIdToMValue);
+ configManager.addTopicProperties(groupIdToTopic);
}
} catch (Exception ex) {
LOG.error("exception caught", ex);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 44e932e..91b5413 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -35,12 +35,12 @@ public class RemoteConfigJson {
public static class DataItem {
- private String bid;
+ private String groupId;
private String topic;
private String m;
- public String getBid() {
- return bid;
+ public String getGroupId() {
+ return groupId;
}
public String getTopic() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 6137886..ae8a035 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -187,7 +187,7 @@ public class RemoteConfigManager implements IRepository {
// request with get
CloseableHttpResponse response = httpClient.execute(httpGet);
String returnStr = EntityUtils.toString(response.getEntity());
- // get bid <-> topic and m value.
+ // get groupId <-> topic and m value.
DataProxyConfigResponse proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class);
if (!proxyResponse.isResult()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
similarity index 54%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
index 5bb6f82..896b059 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BidPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
@@ -25,21 +25,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * bid to m value
+ * groupId to m value
*/
-public class BidPropertiesHolder extends PropertiesConfigHolder {
+public class GroupIdPropertiesHolder extends PropertiesConfigHolder {
- private static final Logger LOG = LoggerFactory.getLogger(BidPropertiesHolder.class);
- private static final String BID_VALUE_SPLITTER = "#";
+ private static final Logger LOG = LoggerFactory.getLogger(GroupIdPropertiesHolder.class);
+ private static final String GROUPID_VALUE_SPLITTER = "#";
- private Map<String, String> bidMappingProperties =
+ private Map<String, String> groupIdMappingProperties =
new HashMap<String, String>();
- private Map<String, Map<String, String>> tidMappingProperties =
+ private Map<String, Map<String, String>> streamIdMappingProperties =
new HashMap<String, Map<String, String>>();
- private Map<String, String> bidEnableMappingProperties =
+ private Map<String, String> groupIdEnableMappingProperties =
new HashMap<String, String>();
- public BidPropertiesHolder(String fileName) {
+ public GroupIdPropertiesHolder(String fileName) {
super(fileName);
}
@@ -47,41 +47,41 @@ public class BidPropertiesHolder extends PropertiesConfigHolder {
public void loadFromFileToHolder() {
super.loadFromFileToHolder();
try {
- Map<String, String> tmpBidMappingProperties =
+ Map<String, String> tmpGroupIdMappingProperties =
new HashMap<String, String>();
- Map<String, Map<String, String>> tmpTidMappingProperties =
+ Map<String, Map<String, String>> tmpStreamIdMappingProperties =
new HashMap<String, Map<String, String>>();
- Map<String, String> tmpBidEnableMappingProperties = new HashMap<String, String>();
+ Map<String, String> tmpGroupIdEnableMappingProperties = new HashMap<String, String>();
for (Map.Entry<String, String> entry : super.getHolder().entrySet()) {
- String[] sArray = StringUtils.split(entry.getKey(), BID_VALUE_SPLITTER);
+ String[] sArray = StringUtils.split(entry.getKey(), GROUPID_VALUE_SPLITTER);
if (sArray.length != 3) {
- LOG.warn("invalid bid key {}", entry.getKey());
+ LOG.warn("invalid groupId key {}", entry.getKey());
continue;
}
- tmpBidMappingProperties.put(sArray[0].trim(), sArray[1].trim());
- tmpBidEnableMappingProperties.put(sArray[0].trim(), sArray[2].trim());
+ tmpGroupIdMappingProperties.put(sArray[0].trim(), sArray[1].trim());
+ tmpGroupIdEnableMappingProperties.put(sArray[0].trim(), sArray[2].trim());
if (StringUtils.isNotBlank(entry.getValue())) {
- tmpTidMappingProperties.put(sArray[0].trim(),
+ tmpStreamIdMappingProperties.put(sArray[0].trim(),
MAP_SPLITTER.split(entry.getValue()));
}
}
- bidMappingProperties = tmpBidMappingProperties;
- tidMappingProperties = tmpTidMappingProperties;
- bidEnableMappingProperties = tmpBidEnableMappingProperties;
+ groupIdMappingProperties = tmpGroupIdMappingProperties;
+ streamIdMappingProperties = tmpStreamIdMappingProperties;
+ groupIdEnableMappingProperties = tmpGroupIdEnableMappingProperties;
} catch (Exception e) {
LOG.error("loadConfig error :", e);
}
}
- public Map<String, String> getBidMappingProperties() {
- return bidMappingProperties;
+ public Map<String, String> getGroupIdMappingProperties() {
+ return groupIdMappingProperties;
}
- public Map<String, Map<String, String>> getTidMappingProperties() {
- return tidMappingProperties;
+ public Map<String, Map<String, String>> getStreamIdMappingProperties() {
+ return streamIdMappingProperties;
}
- public Map<String, String> getBidEnableMappingProperties() {
- return bidEnableMappingProperties;
+ public Map<String, String> getGroupIdEnableMappingProperties() {
+ return groupIdEnableMappingProperties;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 168ea87..b6f46da 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -54,27 +54,27 @@ public class ConfigMessageServlet extends HttpServlet {
}
private boolean handleTopicConfig(RequestContent requestContent) {
- Map<String, String> bidToTopic = new HashMap<String, String>();
+ Map<String, String> groupIdToTopic = new HashMap<String, String>();
for (Map<String, String> item : requestContent.getContent()) {
- bidToTopic.put(item.get("bid"), item.get("topic"));
+ groupIdToTopic.put(item.get("groupId"), item.get("topic"));
}
if ("add".equals(requestContent.getOperationType())) {
- return configManager.addTopicProperties(bidToTopic);
+ return configManager.addTopicProperties(groupIdToTopic);
} else if ("delete".equals(requestContent.getOperationType())) {
- return configManager.deleteTopicProperties(bidToTopic);
+ return configManager.deleteTopicProperties(groupIdToTopic);
}
return false;
}
private boolean handleMxConfig(RequestContent requestContent) {
- Map<String, String> bidToMValue = new HashMap<String, String>();
+ Map<String, String> groupIdToMValue = new HashMap<String, String>();
for (Map<String, String> item : requestContent.getContent()) {
- bidToMValue.put(item.get("bid"), item.get("m"));
+ groupIdToMValue.put(item.get("groupId"), item.get("m"));
}
if ("add".equals(requestContent.getOperationType())) {
- return configManager.addMxProperties(bidToMValue);
+ return configManager.addMxProperties(groupIdToMValue);
} else if ("delete".equals(requestContent.getOperationType())) {
- return configManager.deleteMxProperties(bidToMValue);
+ return configManager.deleteMxProperties(groupIdToMValue);
}
return false;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index bb63a7b..7290089 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -29,18 +29,18 @@ public interface AttributeConstants {
String OPERATION_CONTENT = "content";
/**
- * business id unique string id for each business or product
+ * group id unique string id for each business or product
*/
- String BUSINESS_ID = "bid";
+ String GROUP_ID = "groupId";
/**
* interface id unique string id for each interface of business An interface stand for a kind of
* data
*/
- String INTERFACE_ID = "tid";
+ String INTERFACE_ID = "streamId";
/**
- * iname is like a tid but used in file protocol(m=xxx)
+ * iname is like a streamId but used in file protocol(m=xxx)
*/
String INAME = "iname";
@@ -77,7 +77,7 @@ public interface AttributeConstants {
String NUM2NAME = "num2name";
- String BID_NUM = "bidnum";
+ String GROUPID_NUM = "groupIdnum";
- String TID_NUM = "tidnum";
+ String STREAMID_NUM = "streamIdnum";
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index b906386..fedcc80 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -99,7 +99,7 @@ public class ConfigConstants {
public static final String FILE_CHECK_DATA = "file-check-data";
public static final String MINUTE_CHECK_DATA = "minute-check-data";
public static final String SLA_METRIC_DATA = "sla-metric-data";
- public static final String SLA_METRIC_BID = "manager_sla_metric";
+ public static final String SLA_METRIC_GROUPID = "manager_sla_metric";
public static final String FILE_BODY = "file-body";
public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 658dea9..034b2b4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -89,8 +89,8 @@ public class MetaSink extends AbstractSink implements Configurable {
private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
private static final String LOG_TOPIC = "proxy-log-topic";
- private static final String TID = "proxy-log-tid";
- private static final String BID = "proxy-log-bid";
+ private static final String STREAMID = "proxy-log-streamid";
+ private static final String GROUPID = "proxy-log-groupid";
private static final String SEND_REMOTE = "send-remote";
private static final String topicsFilePath = "topics.properties";
private static final String slaTopicFilePath = "slaTopics.properties";
@@ -104,8 +104,8 @@ public class MetaSink extends AbstractSink implements Configurable {
private int maxSurvivedSize = 100000;
private String proxyLogTopic = "teg_manager";
- private String proxyLogBid = "b_teg_manager";
- private String proxyLogTid = "proxy_measure_log";
+ private String proxyLogGroupId = "b_teg_manager";
+ private String proxyLogStreamId = "proxy_measure_log";
private boolean sendRemote = false;
private ConfigManager configManager;
private Map<String, String> topicProperties;
@@ -371,13 +371,13 @@ public class MetaSink extends AbstractSink implements Configurable {
} else {
Message message = new Message(topic, event.getBody());
message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
- String tid = "";
+ String streamId = "";
if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+ streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
} else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- tid = event.getHeaders().get(AttributeConstants.INAME);
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
}
- message.putSystemHeader(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
producer.sendMessage(message, new MyCallback(es));
flag.set(true);
@@ -399,13 +399,13 @@ public class MetaSink extends AbstractSink implements Configurable {
Message message = new Message(topic, event.getBody());
message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
- String tid = "";
+ String streamId = "";
if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+ streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
} else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- tid = event.getHeaders().get(AttributeConstants.INAME);
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
}
- message.putSystemHeader(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
producer.sendMessage(message, new MyCallback(es));
flag.set(true);
@@ -700,8 +700,8 @@ public class MetaSink extends AbstractSink implements Configurable {
}
if (sendRemote) {
proxyLogTopic = context.getString(LOG_TOPIC, proxyLogTopic);
- proxyLogBid = context.getString(BID, proxyLogTid);
- proxyLogTid = context.getString(TID, proxyLogTid);
+ proxyLogGroupId = context.getString(GROUPID, proxyLogStreamId);
+ proxyLogStreamId = context.getString(STREAMID, proxyLogStreamId);
}
resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e56853a..0fb50ba 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -413,16 +413,16 @@ public class PulsarSink extends AbstractSink implements Configurable,
private void editStatistic(final Event event, String keyPostfix, String msgId) {
String topic = "";
- String tid = "";
+ String streamId = "";
String nodeIp = null;
if (event != null) {
if (event.getHeaders().containsKey(TOPIC)) {
topic = event.getHeaders().get(TOPIC);
}
if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+ streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
} else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- tid = event.getHeaders().get(AttributeConstants.INAME);
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
}
/*
@@ -459,12 +459,12 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
/*
- * SINK_INTF#metasink1#topic#tid#clientIp#busIP#pkgTime#successCnt#packcnt
+ * SINK_INTF#metasink1#topic#streamId#clientIp#busIP#pkgTime#successCnt#packcnt
* #packsize#failCnt
*/
StringBuilder newbase = new StringBuilder();
newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
- .append(tid).append(SEPARATOR).append(nodeIp)
+ .append(streamId).append(SEPARATOR).append(nodeIp)
.append(SEPARATOR).append(NetworkUtils.getLocalIp())
.append(SEPARATOR).append(msgId).append(SEPARATOR)
.append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7aae1b6..ebf0822 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -155,13 +155,13 @@ public class PulsarClientService {
Map<String, String> proMap = new HashMap<>();
proMap.put("tdbusip", localIp);
- String tid = "";
+ String streamId = "";
if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- tid = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+ streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
} else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- tid = event.getHeaders().get(AttributeConstants.INAME);
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
}
- proMap.put(tid, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
logger.debug("producer send msg!");
TopicProducerInfo forCallBackP = producer;
forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 468e9f6..6d36fbf 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -255,8 +255,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
+ ";Connection info:" + channel.toString()));
}
- int bidNum = cb.readUnsignedShort();
- int tidNum = cb.readUnsignedShort();
+ int groupIdNum = cb.readUnsignedShort();
+ int streamIdNum = cb.readUnsignedShort();
final int extendField = cb.readUnsignedShort();
long dataTime = cb.readUnsignedInt();
int msgCount = cb.readUnsignedShort();
@@ -299,35 +299,35 @@ public class DefaultServiceDecoder implements ServiceDecoder {
ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
totalDataLen, attrLen, strAttr, bodyLen);
- String bid = null;
- String tid = null;
+ String groupId = null;
+ String streamId = null;
- if (commonAttrMap.containsKey(AttributeConstants.BUSINESS_ID)) {
- bid = commonAttrMap.get(AttributeConstants.BUSINESS_ID);
+ if (commonAttrMap.containsKey(AttributeConstants.GROUP_ID)) {
+ groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
}
if (commonAttrMap.containsKey(AttributeConstants.INTERFACE_ID)) {
- tid = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
+ streamId = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
}
- if ((bid != null) && (tid != null)) {
+ if ((groupId != null) && (streamId != null)) {
commonAttrMap.put(AttributeConstants.NUM2NAME, "FALSE");
dataBuf.putShort(BIN_MSG_EXTEND_OFFSET, (short) (extendField | 0x4));
} else {
- boolean hasNumBid = (((extendField & 0x4) >> 2) == 0x0);
- if (hasNumBid && (0 != bidNum) && (0 != tidNum)) {
+ boolean hasNumGroupId = (((extendField & 0x4) >> 2) == 0x0);
+ if (hasNumGroupId && (0 != groupIdNum) && (0 != streamIdNum)) {
commonAttrMap.put(AttributeConstants.NUM2NAME, "TRUE");
- commonAttrMap.put(AttributeConstants.BID_NUM, String.valueOf(bidNum));
- commonAttrMap.put(AttributeConstants.TID_NUM, String.valueOf(tidNum));
+ commonAttrMap.put(AttributeConstants.GROUPID_NUM, String.valueOf(groupIdNum));
+ commonAttrMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(streamIdNum));
}
}
if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !index) {
List<ProxyMessage> msgList = new ArrayList<>(1);
- msgList.add(new ProxyMessage(bid, tid, commonAttrMap, dataBuf.array()));
+ msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, dataBuf.array()));
resultMap.put(ConfigConstants.MSG_LIST, msgList);
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
List<ProxyMessage> msgList = new ArrayList<>(1);
- msgList.add(new ProxyMessage(bid, tid, commonAttrMap,
+ msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
(byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
resultMap.put(ConfigConstants.MSG_LIST, msgList);
}
@@ -411,8 +411,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
// fill up attr map with some keys.
commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
- String bid = commonAttrMap.get(AttributeConstants.BUSINESS_ID);
- String tid = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
+ String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
+ String streamId = commonAttrMap.get(AttributeConstants.INTERFACE_ID);
// add message count attr
String cntStr = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
@@ -433,12 +433,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
byte[] record = new byte[singleMsgLen];
bodyBuffer.get(record);
- ProxyMessage message = new ProxyMessage(bid, tid, commonAttrMap, record);
+ ProxyMessage message = new ProxyMessage(groupId, streamId, commonAttrMap, record);
msgList.add(message);
}
} else {
msgList = new ArrayList<>(1);
- msgList.add(new ProxyMessage(bid, tid, commonAttrMap, bodyData));
+ msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, bodyData));
}
resultMap.put(ConfigConstants.MSG_LIST, msgList);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 46d5b35..eadf2bc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,7 +18,7 @@
package org.apache.inlong.dataproxy.source;
import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_BID;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
@@ -214,29 +214,29 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
}
- private void checkBidInfo(ProxyMessage message, Map<String, String> commonAttrMap,
+ private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
Map<String, String> attrMap, AtomicReference<String> topicInfo) {
- String bid = message.getBid();
- String tid;
- if (null != bid) {
+ String groupId = message.getGroupId();
+ String streamId;
+ if (null != groupId) {
String from = commonAttrMap.get(AttributeConstants.FROM);
if ("dc".equals(from)) {
- String dcInterfaceId = message.getTid();
+ String dcInterfaceId = message.getStreamId();
if (StringUtils.isNotEmpty(dcInterfaceId)
&& configManager.getDcMappingProperties()
.containsKey(dcInterfaceId.trim())) {
- bid = configManager.getDcMappingProperties()
+ groupId = configManager.getDcMappingProperties()
.get(dcInterfaceId.trim()).trim();
- message.setBid(bid);
+ message.setGroupId(groupId);
}
}
- String value = configManager.getTopicProperties().get(bid);
+ String value = configManager.getTopicProperties().get(groupId);
if (StringUtils.isNotEmpty(value)) {
topicInfo.set(value.trim());
}
- Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(bid);
+ Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
if (mxValue != null && mxValue.size() != 0) {
message.getAttributeMap().putAll(mxValue);
} else {
@@ -244,30 +244,30 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
} else {
String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
- String bidNum = commonAttrMap.get(AttributeConstants.BID_NUM);
- String tidNum = commonAttrMap.get(AttributeConstants.TID_NUM);
-
- if (configManager.getBidMappingProperties() != null
- && configManager.getTidMappingProperties() != null) {
- bid = configManager.getBidMappingProperties().get(bidNum);
- tid = (configManager.getTidMappingProperties().get(bidNum) == null)
- ? null : configManager.getTidMappingProperties().get(bidNum).get(tidNum);
- if (bid != null && tid != null) {
+ String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
+ String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
+
+ if (configManager.getGroupIdMappingProperties() != null
+ && configManager.getStreamIdMappingProperties() != null) {
+ groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
+ streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
+ ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ if (groupId != null && streamId != null) {
String enableTrans =
- (configManager.getBidEnableMappingProperties() == null)
- ? null : configManager.getBidEnableMappingProperties().get(bidNum);
+ (configManager.getGroupIdEnableMappingProperties() == null)
+ ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
.equalsIgnoreCase(num2name))) {
- String extraAttr = "bid=" + bid + "&" + "tid=" + tid;
+ String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
message.setData(newBinMsg(message.getData(), extraAttr));
}
- attrMap.put(AttributeConstants.BUSINESS_ID, bid);
- attrMap.put(AttributeConstants.INTERFACE_ID, tid);
- message.setBid(bid);
- message.setTid(tid);
+ attrMap.put(AttributeConstants.GROUP_ID, groupId);
+ attrMap.put(AttributeConstants.INTERFACE_ID, streamId);
+ message.setGroupId(groupId);
+ message.setStreamId(streamId);
- String value = configManager.getTopicProperties().get(bid);
+ String value = configManager.getTopicProperties().get(groupId);
if (StringUtils.isNotEmpty(value)) {
topicInfo.set(value.trim());
}
@@ -285,25 +285,25 @@ public class ServerMessageHandler extends SimpleChannelHandler {
String topic = this.defaultTopic;
AtomicReference<String> topicInfo = new AtomicReference<>(topic);
- checkBidInfo(message, commonAttrMap, attrMap, topicInfo);
+ checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
topic = topicInfo.get();
-// if(bid==null)bid="b_test";//default bid
+// if(groupId==null)groupId="b_test";//default groupId
message.setTopic(topic);
commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
- String bid = message.getBid();
- String tid = message.getTid();
+ String groupId = message.getGroupId();
+ String streamId = message.getStreamId();
// whether sla
- if (SLA_METRIC_BID.equals(bid)) {
+ if (SLA_METRIC_GROUPID.equals(groupId)) {
commonAttrMap.put(SLA_METRIC_DATA, "true");
message.setTopic(SLA_METRIC_DATA);
}
- if (bid != null && tid != null) {
- String tubeSwtichKey = bid + SEPARATOR + tid;
+ if (groupId != null && streamId != null) {
+ String tubeSwtichKey = groupId + SEPARATOR + streamId;
if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
&& "false".equals(configManager.getTubeSwitchProperties()
.get(tubeSwtichKey).trim())) {
@@ -326,14 +326,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
}
- if (tid == null) {
- tid = "";
+ if (streamId == null) {
+ streamId = "";
}
- HashMap<String, List<ProxyMessage>> tidMsgMap = messageMap
+ HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
.computeIfAbsent(topic, k -> new HashMap<>());
- List<ProxyMessage> tidMsgList = tidMsgMap
- .computeIfAbsent(tid, k -> new ArrayList<>());
- tidMsgList.add(message);
+ List<ProxyMessage> streamIdMsgList = streamIdMsgMap
+ .computeIfAbsent(streamId, k -> new ArrayList<>());
+ streamIdMsgList.add(message);
}
}
@@ -349,11 +349,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
- for (Map.Entry<String, List<ProxyMessage>> tidEntry : topicEntry.getValue().entrySet()) {
+ for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
TDMsg1 tdMsg = TDMsg1.newTDMsg(this.isCompressed, tdMsgVer);
Map<String, String> headers = new HashMap<String, String>();
- for (ProxyMessage message : tidEntry.getValue()) {
+ for (ProxyMessage message : streamIdEntry.getValue()) {
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
tdMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
@@ -382,7 +382,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
- headers.put(AttributeConstants.INTERFACE_ID, tidEntry.getKey());
+ headers.put(AttributeConstants.INTERFACE_ID, streamIdEntry.getKey());
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
// every message share the same msg cnt? what if msgType = 5
@@ -396,7 +396,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
if (StringUtils.isNotEmpty(sequenceId)) {
StringBuilder sidBuilder = new StringBuilder();
- sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(tidEntry.getKey())
+ sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
.append(SEPARATOR).append(sequenceId);
headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
}
@@ -412,8 +412,8 @@ public class ServerMessageHandler extends SimpleChannelHandler {
throw new MessageIDException(uniqVal,
ErrorCode.DT_ERROR,
new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
- + " has error, detail is: topic=" + topicEntry.getKey() + "&tid="
- + tidEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
+ + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
+ + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
}
dtten = dtten / 1000 / 60 / 10;