You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 01:57:13 UTC

[inlong] branch master updated: [INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d750578d [INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)
1d750578d is described below

commit 1d750578db37bda3dd133fb757d000723c8b032c
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Sep 8 09:57:09 2022 +0800

    [INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 27 +++-------------------
 .../inlongmsg/InLongMsgDeserializationSchema.java  |  7 ++++--
 2 files changed, 8 insertions(+), 26 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 7522a2e43..6d88a303b 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -34,7 +34,6 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -45,11 +44,11 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
@@ -188,15 +187,13 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
 
     @Override
     public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
-        AtomicLong counter = new AtomicLong();
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                counter.addAndGet(1L);
+                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
                 collector.collect(inputRow);
             }));
-            outputMetrics(counter, message);
             return;
         }
         BufferingCollector keyCollector = new BufferingCollector();
@@ -215,32 +212,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             outputCollector.collect(null);
         } else {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                counter.addAndGet(1L);
+                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
                 outputCollector.collect(inputRow);
             }));
-            outputMetrics(counter, message);
         }
 
         keyCollector.buffer.clear();
     }
 
-    private void outputMetrics(AtomicLong counter, Message<RowData> message) {
-        if (sourceMetricData != null) {
-            sourceMetricData.getNumRecordsIn().inc(counter.get());
-            sourceMetricData.getNumBytesIn()
-                    .inc(message.getData().length);
-        }
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    counter.get(),
-                    message.getData().length);
-        }
-    }
-
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index d14c982e9..0e15b847e 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -18,7 +18,6 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
-import com.esotericsoftware.minlog.Log;
 import com.google.common.base.Objects;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -27,6 +26,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.common.msg.InLongMsg;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -38,6 +39,8 @@ import java.util.stream.Collectors;
 
 public class InLongMsgDeserializationSchema implements DeserializationSchema<RowData> {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(InLongMsgDeserializationSchema.class);
+
     /** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} inner packaged
      *  data buffer message */
     private final DeserializationSchema<RowData> deserializationSchema;
@@ -83,7 +86,7 @@ public class InLongMsgDeserializationSchema implements DeserializationSchema<Row
                 head = InLongMsgUtils.parseHead(attr);
             } catch (Throwable t) {
                 if (ignoreErrors) {
-                    Log.warn("Ignore inlong msg attr({})parse error.", attr, t);
+                    LOGGER.warn("Ignore inlong msg attr({})parse error.", attr, t);
                     continue;
                 }
                 throw new IOException(