You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 06:14:09 UTC

[inlong] branch master updated: [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b9676791b [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
b9676791b is described below

commit b9676791b8cbdd0550a148a1083dc5b06055d383
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Fri Sep 2 14:14:03 2022 +0800

    [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 79 ++++++++++++++++++----
 .../pulsar/withoutadmin/CallbackCollector.java     | 47 +++++++++++++
 2 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 38a97bf5c..7522a2e43 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,10 +18,11 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.util.serialization.FlinkSchema;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -35,14 +36,21 @@ import org.apache.flink.util.Preconditions;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
@@ -118,7 +126,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             inlongGroupId = inlongMetricArray[0];
             inlongStreamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
+            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
             sourceMetricData.registerMetricsForNumBytesIn();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsIn();
@@ -132,6 +140,40 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
 
     }
 
+    /**
+     * reflect get metricGroup
+     *
+     * @param context Contextual information that can be used during initialization.
+     * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based
+     *         on the group names.
+     */
+    private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context)
+            throws NoSuchFieldException, IllegalAccessException {
+        MetricGroup metricGroup;
+        String className = "RuntimeContextDeserializationInitializationContextAdapter";
+        String fieldName = "runtimeContext";
+        Class runtimeContextDeserializationInitializationContextAdapter = null;
+        Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
+        for (Class clazz : innerClazz) {
+            int mod = clazz.getModifiers();
+            if (Modifier.isPrivate(mod)) {
+                if (className.equalsIgnoreCase(clazz.getSimpleName())) {
+                    runtimeContextDeserializationInitializationContextAdapter = clazz;
+                    break;
+                }
+            }
+        }
+        if (runtimeContextDeserializationInitializationContextAdapter != null) {
+            Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            RuntimeContext runtimeContext = (RuntimeContext) field.get(context);
+            metricGroup = runtimeContext.getMetricGroup();
+        } else {
+            metricGroup = context.getMetricGroup();
+        }
+        return metricGroup;
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
@@ -146,11 +188,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
 
     @Override
     public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
+        AtomicLong counter = new AtomicLong();
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
-            valueDeserialization.deserialize(message.getData(), collector);
-            outputMetrics(message);
+            valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+                counter.addAndGet(1L);
+                collector.collect(inputRow);
+            }));
+            outputMetrics(counter, message);
             return;
         }
         BufferingCollector keyCollector = new BufferingCollector();
@@ -168,27 +214,30 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(message.getData(), outputCollector);
-            outputMetrics(message);
+            valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+                counter.addAndGet(1L);
+                outputCollector.collect(inputRow);
+            }));
+            outputMetrics(counter, message);
         }
 
         keyCollector.buffer.clear();
     }
 
-    private void outputMetrics(Message<RowData> message) {
+    private void outputMetrics(AtomicLong counter, Message<RowData> message) {
         if (sourceMetricData != null) {
-            sourceMetricData.getNumRecordsIn().inc(1L);
+            sourceMetricData.getNumRecordsIn().inc(counter.get());
             sourceMetricData.getNumBytesIn()
                     .inc(message.getData().length);
         }
         if (auditImp != null) {
             auditImp.add(
-                Constants.AUDIT_SORT_INPUT,
-                inlongGroupId,
-                inlongStreamId,
-                System.currentTimeMillis(),
-                1,
-                message.getData().length);
+                    Constants.AUDIT_SORT_INPUT,
+                    inlongGroupId,
+                    inlongStreamId,
+                    System.currentTimeMillis(),
+                    counter.get(),
+                    message.getData().length);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
new file mode 100644
index 000000000..e61b7c3f7
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.withoutadmin;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A collector supporting callback.
+ */
+public class CallbackCollector<T> implements Collector<T> {
+
+    private final ThrowingConsumer<T, Exception> callback;
+
+    public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void collect(T t) {
+        try {
+            callback.accept(t);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+
+    }
+}