You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2023/01/09 08:52:50 UTC

[GitHub] [inlong] woofyzhao opened a new pull request, #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

woofyzhao opened a new pull request, #7195:
URL: https://github.com/apache/inlong/pull/7195

   #Fix7194
   
   ### Motivation
   
   Missing old index log statistics for simple events. Migrate the old logic from pulsar sink and tube sink.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] luchunliang commented on a diff in pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #7195:
URL: https://github.com/apache/inlong/pull/7195#discussion_r1065414401


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -268,4 +308,40 @@ public void processSendFail(BatchPackProfile currentRecord, String topic, long s
             currentRecord.fail();
         }
     }
+
+    private void appendIndex(Event event, boolean isSuccess) {
+        if (event == null) {
+            return;
+        }
+        if (mqConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // add monitor items base file storage
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+        int intMsgCnt = Integer.parseInt(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+        long dataTimeL = Long.parseLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        Pair<Boolean, String> evenProcType =
+                MessageUtils.getEventProcType(event);
+        // build statistic key
+        StringBuilder newBase = new StringBuilder(512)
+                .append(getSinkName()).append(SEP_HASHTAG).append(topic)
+                .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+                .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+                .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        // count data
+        if (isSuccess) {
+            monitorIndex.addAndGet(newBase.toString(),
+                    intMsgCnt, 1, event.getBody().length, 0);
+            monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");

Review Comment:
   MQ must not be pulsar.



##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -52,6 +68,10 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     private final CacheClusterConfigHolder cacheHolder;
     private final INLONG_COMPRESSED_TYPE compressType;
 
+    private final MQClusterConfig mqConfig;
+    private MonitorIndex monitorIndex;
+    private MonitorIndexExt monitorIndexExt;

Review Comment:
   No code use the data of monitorIndex and monitorIndexExt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #7195:
URL: https://github.com/apache/inlong/pull/7195#discussion_r1065433543


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -52,6 +68,10 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     private final CacheClusterConfigHolder cacheHolder;
     private final INLONG_COMPRESSED_TYPE compressType;
 
+    private final MQClusterConfig mqConfig;
+    private MonitorIndex monitorIndex;
+    private MonitorIndexExt monitorIndexExt;

Review Comment:
   It's intended to print statistics to file, index.log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] luchunliang commented on a diff in pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
luchunliang commented on code in PR #7195:
URL: https://github.com/apache/inlong/pull/7195#discussion_r1065419493


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -268,4 +308,40 @@ public void processSendFail(BatchPackProfile currentRecord, String topic, long s
             currentRecord.fail();
         }
     }
+
+    private void appendIndex(Event event, boolean isSuccess) {
+        if (event == null) {
+            return;
+        }
+        if (mqConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // add monitor items base file storage
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);

Review Comment:
   These logic can implement in the user-defined MQ Handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on PR #7195:
URL: https://github.com/apache/inlong/pull/7195#issuecomment-1378252708

   Consider using file listener instead. Close this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao closed pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
woofyzhao closed pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink
URL: https://github.com/apache/inlong/pull/7195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #7195:
URL: https://github.com/apache/inlong/pull/7195#discussion_r1065436683


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -268,4 +308,40 @@ public void processSendFail(BatchPackProfile currentRecord, String topic, long s
             currentRecord.fail();
         }
     }
+
+    private void appendIndex(Event event, boolean isSuccess) {
+        if (event == null) {
+            return;
+        }
+        if (mqConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // add monitor items base file storage
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);

Review Comment:
   This is not MQ specific but message protocol specific, which is handled by common context logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #7195: [INLONG-7194][DataProxy] Add index log statistics for MQ sink

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #7195:
URL: https://github.com/apache/inlong/pull/7195#discussion_r1065439273


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java:
##########
@@ -268,4 +308,40 @@ public void processSendFail(BatchPackProfile currentRecord, String topic, long s
             currentRecord.fail();
         }
     }
+
+    private void appendIndex(Event event, boolean isSuccess) {
+        if (event == null) {
+            return;
+        }
+        if (mqConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // add monitor items base file storage
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+        int intMsgCnt = Integer.parseInt(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+        long dataTimeL = Long.parseLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        Pair<Boolean, String> evenProcType =
+                MessageUtils.getEventProcType(event);
+        // build statistic key
+        StringBuilder newBase = new StringBuilder(512)
+                .append(getSinkName()).append(SEP_HASHTAG).append(topic)
+                .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+                .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+                .append(SEP_HASHTAG).append(evenProcType.getRight()).append(SEP_HASHTAG)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        // count data
+        if (isSuccess) {
+            monitorIndex.addAndGet(newBase.toString(),
+                    intMsgCnt, 1, event.getBody().length, 0);
+            monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org