You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 06:14:09 UTC
[inlong] branch master updated: [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b9676791b [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
b9676791b is described below
commit b9676791b8cbdd0550a148a1083dc5b06055d383
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Fri Sep 2 14:14:03 2022 +0800
[INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
---
.../table/DynamicPulsarDeserializationSchema.java | 79 ++++++++++++++++++----
.../pulsar/withoutadmin/CallbackCollector.java | 47 +++++++++++++
2 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 38a97bf5c..7522a2e43 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,10 +18,11 @@
package org.apache.inlong.sort.pulsar.table;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
import org.apache.flink.streaming.util.serialization.FlinkSchema;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -35,14 +36,21 @@ import org.apache.flink.util.Preconditions;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
@@ -118,7 +126,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
inlongGroupId = inlongMetricArray[0];
inlongStreamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
+ sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
sourceMetricData.registerMetricsForNumBytesIn();
sourceMetricData.registerMetricsForNumBytesInPerSecond();
sourceMetricData.registerMetricsForNumRecordsIn();
@@ -132,6 +140,40 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
}
+ /**
+ * reflect get metricGroup
+ *
+ * @param context Contextual information that can be used during initialization.
+ * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based
+ * on the group names.
+ */
+ private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context)
+ throws NoSuchFieldException, IllegalAccessException {
+ MetricGroup metricGroup;
+ String className = "RuntimeContextDeserializationInitializationContextAdapter";
+ String fieldName = "runtimeContext";
+ Class runtimeContextDeserializationInitializationContextAdapter = null;
+ Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
+ for (Class clazz : innerClazz) {
+ int mod = clazz.getModifiers();
+ if (Modifier.isPrivate(mod)) {
+ if (className.equalsIgnoreCase(clazz.getSimpleName())) {
+ runtimeContextDeserializationInitializationContextAdapter = clazz;
+ break;
+ }
+ }
+ }
+ if (runtimeContextDeserializationInitializationContextAdapter != null) {
+ Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ RuntimeContext runtimeContext = (RuntimeContext) field.get(context);
+ metricGroup = runtimeContext.getMetricGroup();
+ } else {
+ metricGroup = context.getMetricGroup();
+ }
+ return metricGroup;
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
@@ -146,11 +188,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
@Override
public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
+ AtomicLong counter = new AtomicLong();
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
- valueDeserialization.deserialize(message.getData(), collector);
- outputMetrics(message);
+ valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+ counter.addAndGet(1L);
+ collector.collect(inputRow);
+ }));
+ outputMetrics(counter, message);
return;
}
BufferingCollector keyCollector = new BufferingCollector();
@@ -168,27 +214,30 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(message.getData(), outputCollector);
- outputMetrics(message);
+ valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+ counter.addAndGet(1L);
+ outputCollector.collect(inputRow);
+ }));
+ outputMetrics(counter, message);
}
keyCollector.buffer.clear();
}
- private void outputMetrics(Message<RowData> message) {
+ private void outputMetrics(AtomicLong counter, Message<RowData> message) {
if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumRecordsIn().inc(counter.get());
sourceMetricData.getNumBytesIn()
.inc(message.getData().length);
}
if (auditImp != null) {
auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- 1,
- message.getData().length);
+ Constants.AUDIT_SORT_INPUT,
+ inlongGroupId,
+ inlongStreamId,
+ System.currentTimeMillis(),
+ counter.get(),
+ message.getData().length);
}
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
new file mode 100644
index 000000000..e61b7c3f7
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.withoutadmin;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A collector supporting callback.
+ */
+public class CallbackCollector<T> implements Collector<T> {
+
+ private final ThrowingConsumer<T, Exception> callback;
+
+ public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void collect(T t) {
+ try {
+ callback.accept(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}