You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/15 12:16:16 UTC

[GitHub] [inlong] thesumery opened a new pull request, #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

thesumery opened a new pull request, #5906:
URL: https://github.com/apache/inlong/pull/5906

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   *[INLONG-5903][Sort] Make InLong metric constructs factory more cohesive*
   
   *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #5903 
   
   ### Motivation
   
   *To make inlong sort metric code logic more clearly *
   
   ### Modifications
   
   *Refactor `SinkMetricData` and `SourceMetricData` constuct function.*
   *Add MetricOption to replace scattered metric configuration*
   *Modify `inlong.audit` configuration to `metrics.audit.proxy.hosts`  to keep consistent with manager.*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r974044872


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java:
##########
@@ -94,44 +85,20 @@ public RowElasticsearchSinkFunction(
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            groupId = inlongMetricArray[0];
-            streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-    }
-
-    private void outputMetricForAudit(long size) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    groupId,
-                    streamId,
-                    System.currentTimeMillis(),
-                    1,
-                    size);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongGroupStreamNode(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)

Review Comment:
   is there a bug?The logic before this  it will output dirty data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973986871


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java:
##########
@@ -275,10 +281,17 @@ public class Constants {
             .defaultValue(5)
             .withDescription("minutes");
 
-    public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = key("metrics.audit.proxy.hosts")
-            .noDefaultValue()
-            .withDescription("Audit proxy host address for reporting audit metrics. "
-                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+    public static final ConfigOption<String> METRICS_LABELS =
+            ConfigOptions.key("inlong.metric.labels")
+                    .noDefaultValue()
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");

Review Comment:
   Update the decribe of METRICS_LABELS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973904019


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##########
@@ -247,19 +229,4 @@ public void outputMetrics(long rowCountSize, long rowDataSize) {
                     rowDataSize);
         }
     }
-
-    @Override
-    public String toString() {
-        return "SourceMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
-                + ", numRecordsIn=" + numRecordsIn.getCount()
-                + ", numBytesIn=" + numBytesIn.getCount()
-                + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
-                + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
-                + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
-                + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
-                + '}';
-    }

Review Comment:
   Add toString() for printing some data when invoke snapshotState. Because it is convenient to troubleshooting metrics problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
thesumery commented on PR #5906:
URL: https://github.com/apache/inlong/pull/5906#issuecomment-1249173686

   > pls test with manager and check the result
   
   ![image](https://user-images.githubusercontent.com/107393625/190613992-1ae4aea0-940f-4273-aaea-e3d15f0243a9.png)
   
   
   prometheus result is the same as aduit server database;
   ![image](https://user-images.githubusercontent.com/107393625/190611491-32eff6f1-300a-4da2-9233-98080fe1fa1d.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973900026


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##########
@@ -58,31 +57,24 @@ public class SourceMetricData implements MetricData {
     private AuditImp auditImp;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(),
-                option.getRegisteredMetric(), metricGroup, option.getIpPorts());
+        this(option.getLabels(), option.getRegisteredMetric(), metricGroup, option.getIpPorts());
     }
 
     public SourceMetricData(
-            String groupId,
-            String streamId,
-            String nodeId,
+            Map<String, String> labels,
             @Nullable RegisteredMetric registeredMetric,
             MetricGroup metricGroup,
             @Nullable String auditHostAndPorts) {
         this.metricGroup = metricGroup;
-        if (groupId != null && streamId != null && nodeId != null) {
-            this.groupId = groupId;
-            this.streamId = streamId;
-            this.nodeId = nodeId;
-            switch (registeredMetric) {
-                default:
-                    registerMetricsForNumRecordsIn();
-                    registerMetricsForNumBytesIn();
-                    registerMetricsForNumBytesInPerSecond();
-                    registerMetricsForNumRecordsInPerSecond();
-                    break;
-
-            }
+        this.labels = labels;
+        switch (registeredMetric) {
+            default:
+                registerMetricsForNumRecordsIn();
+                registerMetricsForNumBytesIn();
+                registerMetricsForNumBytesInPerSecond();
+                registerMetricsForNumRecordsInPerSecond();
+                break;

Review Comment:
   lost numRecordsInForMeter and numBytesInForMeter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973995570


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java:
##########
@@ -286,16 +286,17 @@ public void invokeDirty(long rowCount, long rowSize) {
     @Override
     public String toString() {
         return "SinkMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
-                + ", numRecordsOut=" + numRecordsOut.getCount()
-                + ", numBytesOut=" + numBytesOut.getCount()
-                + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
-                + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
-                + ", dirtyRecords=" + dirtyRecords.getCount()
-                + ", dirtyBytes=" + dirtyBytes.getCount()
-                + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
-                + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
+                + ", auditImp=" + auditImp
+                + ", numRecordsOut=" + numRecordsOut
+                + ", numBytesOut=" + numBytesOut
+                + ", numRecordsOutForMeter=" + numRecordsOutForMeter
+                + ", numBytesOutForMeter=" + numBytesOutForMeter
+                + ", dirtyRecords=" + dirtyRecords
+                + ", dirtyBytes=" + dirtyBytes
+                + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond
+                + ", numBytesOutPerSecond=" + numBytesOutPerSecond

Review Comment:
   Counter and Meter don't override toString(). So this print info is not useful. It need print getCount() and getRate().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973996813


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##########
@@ -229,4 +222,19 @@ public void outputMetrics(long rowCountSize, long rowDataSize) {
                     rowDataSize);
         }
     }
+
+    @Override
+    public String toString() {
+        return "SourceMetricData{"
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
+                + ", numRecordsIn=" + numRecordsIn
+                + ", numBytesIn=" + numBytesIn
+                + ", numRecordsInForMeter=" + numRecordsInForMeter
+                + ", numBytesInForMeter=" + numBytesInForMeter
+                + ", numRecordsInPerSecond=" + numRecordsInPerSecond
+                + ", numBytesInPerSecond=" + numBytesInPerSecond

Review Comment:
   this is same problem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r974047625


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java:
##########
@@ -94,44 +85,20 @@ public RowElasticsearchSinkFunction(
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            groupId = inlongMetricArray[0];
-            streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-    }
-
-    private void outputMetricForAudit(long size) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    groupId,
-                    streamId,
-                    System.currentTimeMillis(),
-                    1,
-                    size);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongGroupStreamNode(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)

Review Comment:
   ok , modify it to RegisteredMetric.NORMAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
thesumery commented on PR #5906:
URL: https://github.com/apache/inlong/pull/5906#issuecomment-1250864705

   > 
   
   fixed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973893049


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##########
@@ -34,31 +40,33 @@ public class MetricOption {
             + "3}|65[0-4]\\d{"
             + "2}|655[0-2]\\d|6553[0-5])$";
 
-    private String groupId;
-    private String streamId;
-    private String nodeId;
+    private Map<String, String> labels;
     private final HashSet<String> ipPortList;
     private String ipPorts;
     private RegisteredMetric registeredMetric;
 
     private MetricOption(
-            String inlongGroupStreamNode,
+            String inlongLabels,
             @Nullable String inlongAudit,
             @Nullable RegisteredMetric registeredMetric) {
-        Preconditions.checkNotNull(inlongGroupStreamNode,
-                "Inlong group stream node must be set for register metric.");
-        if (inlongGroupStreamNode != null) {
-            String[] inLongGroupStreamNodeArray = inlongGroupStreamNode.split(DELIMITER);
-            Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3,
-                    "Error inLong metric format: " + inlongGroupStreamNode);
-            this.groupId = inLongGroupStreamNodeArray[0];
-            this.streamId = inLongGroupStreamNodeArray[1];
-            this.nodeId = inLongGroupStreamNodeArray[2];
-        }
+        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+                "Inlong labels must be set for register metric.");
+
+        this.labels = new HashMap<>();

Review Comment:
   It should use LinkHashMap, keep full metric name order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on PR #5906:
URL: https://github.com/apache/inlong/pull/5906#issuecomment-1250621038

   mysql new source need modify metric option


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on PR #5906:
URL: https://github.com/apache/inlong/pull/5906#issuecomment-1250795247

   ```java
       public void invoke(long rowCount, long rowSize) {
           if (numRecordsOut != null) {
               numRecordsOut.inc(rowCount);
           }
           if (numBytesOut != null) {
               numBytesOut.inc(rowSize);
           }
           if (auditImp != null) {
               auditImp.add(
                       Constants.AUDIT_SORT_OUTPUT,
                       getGroupId(),
                       getStreamId(),
                       System.currentTimeMillis(),
                       rowCount,
                       rowSize);
           }
       }
   ```
   lost numRecordsOutForMeter and numBytesOutForMeter computing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973908263


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##########
@@ -82,10 +82,11 @@ public final class Constants {
     public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
 
     public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.group_stream_node")
+        ConfigOptions.key("inlong.metric.labels")
             .stringType()
             .noDefaultValue()
-            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+            .withDescription("INLONG metric labels, format is 'key1=value1&key2&value2',"

Review Comment:
   description is error, format is 'key1=value1&key2=value2&key3=value3'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973896585


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##########
@@ -34,31 +40,33 @@ public class MetricOption {
             + "3}|65[0-4]\\d{"
             + "2}|655[0-2]\\d|6553[0-5])$";
 
-    private String groupId;
-    private String streamId;
-    private String nodeId;
+    private Map<String, String> labels;
     private final HashSet<String> ipPortList;
     private String ipPorts;
     private RegisteredMetric registeredMetric;
 
     private MetricOption(
-            String inlongGroupStreamNode,
+            String inlongLabels,
             @Nullable String inlongAudit,
             @Nullable RegisteredMetric registeredMetric) {
-        Preconditions.checkNotNull(inlongGroupStreamNode,
-                "Inlong group stream node must be set for register metric.");
-        if (inlongGroupStreamNode != null) {
-            String[] inLongGroupStreamNodeArray = inlongGroupStreamNode.split(DELIMITER);
-            Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3,
-                    "Error inLong metric format: " + inlongGroupStreamNode);
-            this.groupId = inLongGroupStreamNodeArray[0];
-            this.streamId = inLongGroupStreamNodeArray[1];
-            this.nodeId = inLongGroupStreamNodeArray[2];
-        }
+        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+                "Inlong labels must be set for register metric.");
+
+        this.labels = new HashMap<>();

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r972704659


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java:
##########
@@ -94,44 +85,20 @@ public RowElasticsearchSinkFunction(
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            groupId = inlongMetricArray[0];
-            streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-    }
-
-    private void outputMetricForAudit(long size) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    groupId,
-                    streamId,
-                    System.currentTimeMillis(),
-                    1,
-                    size);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongGroupStreamNode(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)

Review Comment:
   should not register DirtyBytes and DirtyRecords



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973909779


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java:
##########
@@ -280,6 +281,12 @@ public class Constants {
             .defaultValue(5)
             .withDescription("minutes");
 
+    public static final ConfigOption<String> METRICS_LABELS =
+            ConfigOptions.key("inlong.metric.label")
+                    .noDefaultValue()
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+

Review Comment:
   It is different with `INLONG_METRIC` of  `inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #5906:
URL: https://github.com/apache/inlong/pull/5906


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r972569971


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java:
##########
@@ -55,21 +56,40 @@ public class SinkMetricData implements MetricData {
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
 
-    public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, null);
-    }
-
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
+        this(option.getGroupId(), option.getStreamId(), option.getNodeId(),
+                option.getRegisteredMetric(), metricGroup, option.getIpPorts());
     }
 
     public SinkMetricData(
-            String groupId, String streamId, String nodeId, MetricGroup metricGroup,
+            @Nullable String groupId,
+            @Nullable String streamId,
+            @Nullable String nodeId,
+            @Nullable RegisteredMetric registeredMetric,
+            MetricGroup metricGroup,
             @Nullable String auditHostAndPorts) {
         this.metricGroup = metricGroup;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
+        if (groupId != null && streamId != null && nodeId != null) {
+            this.groupId = groupId;
+            this.streamId = streamId;
+            this.nodeId = nodeId;
+            if (RegisteredMetric.ALL.equals(registeredMetric)) {

Review Comment:
   RegisteredMetric.ALL.equals(registeredMetric) -> RegisteredMetric.ALL == registeredMetric



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##########
@@ -86,4 +91,49 @@ public HashSet<String> getIpPortList() {
     public String getIpPorts() {
         return ipPorts;
     }
+
+    public RegisteredMetric getRegisteredMetric() {
+        return registeredMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public enum RegisteredMetric {
+        ALL,
+        NORMAL,
+        DIRTY
+    }
+
+    public static class Builder {
+        private String inLongMetric;
+        private String inLongAudit;

Review Comment:
   inLongAudit -> inlongAudit



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##########
@@ -41,44 +43,44 @@
  */
 public class SourceMetricData implements MetricData {
 
-    private final MetricGroup metricGroup;
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private MetricGroup metricGroup;
+    private String groupId;
+    private String streamId;
+    private String nodeId;
     private Counter numRecordsIn;
     private Counter numBytesIn;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
-    private final AuditImp auditImp;
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
-    }
+    private AuditImp auditImp;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
+        this(option.getGroupId(), option.getStreamId(), option.getNodeId(),
+                option.getRegisteredMetric(), metricGroup, option.getIpPorts());
     }
 
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            AuditImp auditImp) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
-        this.metricGroup = metricGroup;
-        this.auditImp = auditImp;
-    }
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
+    public SourceMetricData(
+            @Nullable String groupId,
+            @Nullable String streamId,
+            @Nullable String nodeId,
+            @Nullable RegisteredMetric registeredMetric,
+            MetricGroup metricGroup,
             @Nullable String auditHostAndPorts) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
         this.metricGroup = metricGroup;
+        if (groupId != null && streamId != null && nodeId != null) {
+            this.groupId = groupId;
+            this.streamId = streamId;
+            this.nodeId = nodeId;
+            if (RegisteredMetric.ALL.equals(registeredMetric)) {

Review Comment:
   RegisteredMetric.ALL.equals(registeredMetric) -> RegisteredMetric.ALL ==  registeredMetric



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java:
##########
@@ -55,21 +56,40 @@ public class SinkMetricData implements MetricData {
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
 
-    public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, null);
-    }
-
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
+        this(option.getGroupId(), option.getStreamId(), option.getNodeId(),
+                option.getRegisteredMetric(), metricGroup, option.getIpPorts());
     }
 
     public SinkMetricData(
-            String groupId, String streamId, String nodeId, MetricGroup metricGroup,
+            @Nullable String groupId,
+            @Nullable String streamId,
+            @Nullable String nodeId,
+            @Nullable RegisteredMetric registeredMetric,
+            MetricGroup metricGroup,
             @Nullable String auditHostAndPorts) {
         this.metricGroup = metricGroup;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
+        if (groupId != null && streamId != null && nodeId != null) {
+            this.groupId = groupId;
+            this.streamId = streamId;
+            this.nodeId = nodeId;
+            if (RegisteredMetric.ALL.equals(registeredMetric)) {

Review Comment:
   It is recommended to use switch instead of if



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##########
@@ -86,4 +91,49 @@ public HashSet<String> getIpPortList() {
     public String getIpPorts() {
         return ipPorts;
     }
+
+    public RegisteredMetric getRegisteredMetric() {
+        return registeredMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public enum RegisteredMetric {
+        ALL,
+        NORMAL,
+        DIRTY
+    }
+
+    public static class Builder {
+        private String inLongMetric;

Review Comment:
   inLongMetric -> inlongMetric



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org