You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/13 11:39:17 UTC
[GitHub] [inlong] gosonzhang opened a new pull request, #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
gosonzhang opened a new pull request, #5882:
URL: https://github.com/apache/inlong/pull/5882
- Fixes #5880
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970217694
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ long msgCount = 1L;
+ if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
Review Comment:
@luchunliang, it is set by source codes,there will be no non-existent cases, set to 1 just to avoid extreme cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #5882:
URL: https://github.com/apache/inlong/pull/5882
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] luchunliang commented on pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
luchunliang commented on PR #5882:
URL: https://github.com/apache/inlong/pull/5882#issuecomment-1245365663
Miss the audit item about AUDIT_ID_DATAPROXY_READ_SUCCESS.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] luchunliang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r969594146
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ long msgCount = 1L;
+ if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
Review Comment:
If event header do not have ConfigConstants.MSG_COUNTER_KEY, handler need to decode the event body to get the message count.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970217694
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ long msgCount = 1L;
+ if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
Review Comment:
@luchunliang, it is set by source codes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] luchunliang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic
Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970252537
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java:
##########
@@ -416,23 +423,19 @@ private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
-
- long pkgTimeInMillis = inLongMsg.getCreatetime();
- String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
- if (inLongMsgVer == 4) {
- if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
- pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
- } else {
- pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
- }
- }
-
- if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
- headers.put(AttributeConstants.DATA_TIME, commonAttrMap.get(AttributeConstants.DATA_TIME));
- } else {
- headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
- }
+ // get msgTime
+ long currTIme = System.currentTimeMillis();
+ String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+ long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+ String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+ headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+ headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+ // get data time
+ long dtTime = NumberUtils.toLong(
+ commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+ headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
Review Comment:
Channel event header do not have the parameter "msgTime".
} else if (msgList != null && commonAttrMap
.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
logger.info("i am in MINUTE_CHECK_DATA");
Map<String, String> headers = new HashMap<String, String>();
headers.put("msgtype", "measure");
headers.put(ConfigConstants.FILE_CHECK_DATA,
"true");
headers.put(AttributeConstants.UNIQ_ID,
commonAttrMap.get(AttributeConstants.UNIQ_ID));
for (ProxyMessage message : msgList) {
byte[] body = message.getData();
Event event = EventBuilder.withBody(body, headers);
if (MessageUtils.isSyncSendForOrder(commonAttrMap
.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
event = new OrderEvent(ctx, event);
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org