You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/13 11:39:17 UTC

[GitHub] [inlong] gosonzhang opened a new pull request, #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

gosonzhang opened a new pull request, #5882:
URL: https://github.com/apache/inlong/pull/5882

   
   - Fixes #5880 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970217694


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
             String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            long msgCount = 1L;
+            if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {

Review Comment:
   @luchunliang,  it is set by source codes,there will be no non-existent cases, set to 1 just to avoid extreme cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #5882:
URL: https://github.com/apache/inlong/pull/5882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] luchunliang commented on pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
luchunliang commented on PR #5882:
URL: https://github.com/apache/inlong/pull/5882#issuecomment-1245365663

   Miss the audit item about AUDIT_ID_DATAPROXY_READ_SUCCESS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] luchunliang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r969594146


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
             String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            long msgCount = 1L;
+            if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {

Review Comment:
   If event header do not have ConfigConstants.MSG_COUNTER_KEY, handler need to decode the event body to get the message count.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970217694


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java:
##########
@@ -88,7 +89,12 @@ public static void add(int auditID, Event event) {
             String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            long msgCount = 1L;
+            if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {

Review Comment:
   @luchunliang,  it is set by source codes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] luchunliang commented on a diff in pull request #5882: [INLONG-5880][DataProxy] Add data reporting time process logic

Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #5882:
URL: https://github.com/apache/inlong/pull/5882#discussion_r970252537


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java:
##########
@@ -416,23 +423,19 @@ private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String
                         inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-
-                long pkgTimeInMillis = inLongMsg.getCreatetime();
-                String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis);
-
-                if (inLongMsgVer == 4) {
-                    if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
-                        pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY);
-                    } else {
-                        pkgTimeStr = dateFormator.get().format(System.currentTimeMillis());
-                    }
-                }
-
-                if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
-                    headers.put(AttributeConstants.DATA_TIME, commonAttrMap.get(AttributeConstants.DATA_TIME));
-                } else {
-                    headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
-                }
+                // get msgTime
+                long currTIme = System.currentTimeMillis();
+                String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
+                long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
+                LocalDateTime localDateTime =
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
+                String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
+                headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
+                headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+                // get data time
+                long dtTime = NumberUtils.toLong(
+                        commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
+                headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));

Review Comment:
   Channel event header do not have the parameter "msgTime".
               } else if (msgList != null && commonAttrMap
                       .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                   logger.info("i am in MINUTE_CHECK_DATA");
                   Map<String, String> headers = new HashMap<String, String>();
                   headers.put("msgtype", "measure");
                   headers.put(ConfigConstants.FILE_CHECK_DATA,
                           "true");
                   headers.put(AttributeConstants.UNIQ_ID,
                           commonAttrMap.get(AttributeConstants.UNIQ_ID));
                   for (ProxyMessage message : msgList) {
                       byte[] body = message.getData();
                       Event event = EventBuilder.withBody(body, headers);
                       if (MessageUtils.isSyncSendForOrder(commonAttrMap
                               .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
                           event = new OrderEvent(ctx, event);
                       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org