You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 01:57:13 UTC
[inlong] branch master updated: [INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1d750578d [INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)
1d750578d is described below
commit 1d750578db37bda3dd133fb757d000723c8b032c
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Sep 8 09:57:09 2022 +0800
[INLONG-5818][Sort] Fix metric computing for Pulsar connector (#5819)
---
.../table/DynamicPulsarDeserializationSchema.java | 27 +++-------------------
.../inlongmsg/InLongMsgDeserializationSchema.java | 7 ++++--
2 files changed, 8 insertions(+), 26 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 7522a2e43..6d88a303b 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -34,7 +34,6 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
import org.apache.pulsar.client.api.Message;
@@ -45,11 +44,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
@@ -188,15 +187,13 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
@Override
public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
- AtomicLong counter = new AtomicLong();
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
- counter.addAndGet(1L);
+ sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
collector.collect(inputRow);
}));
- outputMetrics(counter, message);
return;
}
BufferingCollector keyCollector = new BufferingCollector();
@@ -215,32 +212,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
- counter.addAndGet(1L);
+ sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
outputCollector.collect(inputRow);
}));
- outputMetrics(counter, message);
}
keyCollector.buffer.clear();
}
- private void outputMetrics(AtomicLong counter, Message<RowData> message) {
- if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(counter.get());
- sourceMetricData.getNumBytesIn()
- .inc(message.getData().length);
- }
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- counter.get(),
- message.getData().length);
- }
- }
-
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index d14c982e9..0e15b847e 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sort.formats.inlongmsg;
-import com.esotericsoftware.minlog.Log;
import com.google.common.base.Objects;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -27,6 +26,8 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.inlong.common.msg.InLongMsg;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -38,6 +39,8 @@ import java.util.stream.Collectors;
public class InLongMsgDeserializationSchema implements DeserializationSchema<RowData> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InLongMsgDeserializationSchema.class);
+
/** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} inner packaged
* data buffer message */
private final DeserializationSchema<RowData> deserializationSchema;
@@ -83,7 +86,7 @@ public class InLongMsgDeserializationSchema implements DeserializationSchema<Row
head = InLongMsgUtils.parseHead(attr);
} catch (Throwable t) {
if (ignoreErrors) {
- Log.warn("Ignore inlong msg attr({})parse error.", attr, t);
+ LOGGER.warn("Ignore inlong msg attr({})parse error.", attr, t);
continue;
}
throw new IOException(