You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/21 09:58:01 UTC

[inlong] branch master updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4be184c42 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
4be184c42 is described below

commit 4be184c42ff20399da0f8339854deb5c5829796b
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Sep 21 17:57:56 2022 +0800

    [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 56 ++--------------
 .../pulsar/table/PulsarDynamicTableSource.java     |  4 +-
 .../pulsar/withoutadmin/FlinkPulsarSource.java     | 78 +++++++++++++++++++---
 3 files changed, 75 insertions(+), 63 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index bd4ecaaee..f9a9e2d20 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,11 +18,8 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.util.serialization.FlinkSchema;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
  */
-class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
+public class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
     private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
@@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             keyDeserialization.open(context);
         }
         valueDeserialization.open(context);
-
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context));
-        }
-    }
-
-    /**
-     * reflect get metricGroup
-     *
-     * @param context Contextual information that can be used during initialization.
-     * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based
-     *         on the group names.
-     */
-    private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context)
-            throws NoSuchFieldException, IllegalAccessException {
-        MetricGroup metricGroup;
-        String className = "RuntimeContextDeserializationInitializationContextAdapter";
-        String fieldName = "runtimeContext";
-        Class runtimeContextDeserializationInitializationContextAdapter = null;
-        Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
-        for (Class clazz : innerClazz) {
-            int mod = clazz.getModifiers();
-            if (Modifier.isPrivate(mod)) {
-                if (className.equalsIgnoreCase(clazz.getSimpleName())) {
-                    runtimeContextDeserializationInitializationContextAdapter = clazz;
-                    break;
-                }
-            }
-        }
-        if (runtimeContextDeserializationInitializationContextAdapter != null) {
-            Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName);
-            field.setAccessible(true);
-            RuntimeContext runtimeContext = (RuntimeContext) field.get(context);
-            metricGroup = runtimeContext.getMetricGroup();
-        } else {
-            metricGroup = context.getMetricGroup();
-        }
-        return metricGroup;
     }
 
     @Override
@@ -159,6 +109,10 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         return false;
     }
 
+    public void setMetricData(SourceMetricData metricData) {
+        this.sourceMetricData = metricData;
+    }
+
     @Override
     public RowData deserialize(Message<RowData> message) throws IOException {
         final SimpleCollector<RowData> collector = tlsCollector.get();
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index 22993e6fc..a536fe233 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -320,7 +320,9 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
                 serviceUrl,
                 clientConfigurationData,
                 deserializationSchema,
-                properties
+                properties,
+                inlongMetric,
+                auditHostAndPorts
         );
 
         if (watermarkStrategy != null) {
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
index 1db76eeb0..9cc8d11fe 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.pulsar.withoutadmin;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
@@ -47,7 +46,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
 import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
 import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
 import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
 import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
@@ -60,6 +58,12 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -87,6 +91,9 @@ import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.Puls
 import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER;
 import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 
 /**
  * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9,
@@ -223,11 +230,31 @@ public class FlinkPulsarSource<T>
 
     private transient int numParallelTasks;
 
+
+    private MetricState metricState;
+
+    /**
+     * Metric for InLong
+     */
+    private String inlongMetric;
+    /**
+     * audit host and ports
+     */
+    private String inlongAudit;
+
+    private SourceMetricData sourceMetricData;
+
+    private transient ListState<MetricState> metricStateListState;
+
     public FlinkPulsarSource(
             String serverUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
-            Properties properties) {
+            Properties properties,
+            String inlongMetric,
+            String inlongAudit) {
+        this.inlongAudit = inlongAudit;
+        this.inlongMetric = inlongMetric;
         this.serverUrl = checkNotNull(serverUrl);
         this.clientConfigurationData = checkNotNull(clientConf);
         this.deserializer = deserializer;
@@ -253,14 +280,6 @@ public class FlinkPulsarSource<T>
         this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
     }
 
-    public FlinkPulsarSource(
-            String serviceUrl,
-            DeserializationSchema<T> deserializer,
-            Properties properties) {
-        this(serviceUrl, PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties),
-                PulsarDeserializationSchema.valueOnly(deserializer), properties);
-    }
-
     // ------------------------------------------------------------------------
     //  Configuration
     // ------------------------------------------------------------------------
@@ -408,14 +427,34 @@ public class FlinkPulsarSource<T>
 
     @Override
     public void open(Configuration parameters) throws Exception {
+
+        MetricOption metricOption = MetricOption.builder()
+            .withInlongLabels(inlongMetric)
+            .withInlongAudit(inlongAudit)
+            .withRegisterMetric(RegisteredMetric.ALL)
+            .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+            .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+            .build();
+
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+
         if (this.deserializer != null) {
+
+            DynamicPulsarDeserializationSchema dynamicKafkaDeserializationSchema =
+                (DynamicPulsarDeserializationSchema) deserializer;
+            dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
+
             this.deserializer.open(
                     RuntimeContextInitializationContextAdapters.deserializationAdapter(
                             getRuntimeContext(),
                             metricGroup -> metricGroup.addGroup("user")
                     )
             );
+
         }
+
         this.taskIndex = getRuntimeContext().getIndexOfThisSubtask();
         this.numParallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
 
@@ -708,10 +747,21 @@ public class FlinkPulsarSource<T>
                         createStateSerializer()
                 ));
 
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                stateStore.getUnionListState(
+                    new ListStateDescriptor<>(
+                        INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+
         if (context.isRestored()) {
             restoredState = new TreeMap<>();
             Iterator<Tuple2<TopicSubscription, MessageId>> iterator = unionOffsetStates.get().iterator();
 
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
             if (!iterator.hasNext()) {
                 iterator = tryMigrateState(stateStore);
             }
@@ -809,6 +859,12 @@ public class FlinkPulsarSource<T>
         if (!running) {
             log.debug("snapshotState() called on closed source");
         } else {
+
+            if (sourceMetricData != null && metricStateListState != null) {
+                MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+            }
+
             unionOffsetStates.clear();
 
             PulsarFetcher<T> fetcher = this.pulsarFetcher;