You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/21 10:00:35 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new cdaa9a7f0 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
cdaa9a7f0 is described below
commit cdaa9a7f06956c33206c10aa1c1a4af8de5738f7
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Sep 21 17:57:56 2022 +0800
[INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
---
.../table/DynamicPulsarDeserializationSchema.java | 56 ++--------------
.../pulsar/table/PulsarDynamicTableSource.java | 4 +-
.../pulsar/withoutadmin/FlinkPulsarSource.java | 78 +++++++++++++++++++---
3 files changed, 75 insertions(+), 63 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index bd4ecaaee..f9a9e2d20 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,11 +18,8 @@
package org.apache.inlong.sort.pulsar.table;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
import org.apache.flink.streaming.util.serialization.FlinkSchema;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
import org.apache.pulsar.client.api.Message;
@@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
/**
* A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
*/
-class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
+public class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
@@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
keyDeserialization.open(context);
}
valueDeserialization.open(context);
-
- MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
- .withRegisterMetric(RegisteredMetric.ALL)
- .build();
- if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context));
- }
- }
-
- /**
- * reflect get metricGroup
- *
- * @param context Contextual information that can be used during initialization.
- * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based
- * on the group names.
- */
- private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context)
- throws NoSuchFieldException, IllegalAccessException {
- MetricGroup metricGroup;
- String className = "RuntimeContextDeserializationInitializationContextAdapter";
- String fieldName = "runtimeContext";
- Class runtimeContextDeserializationInitializationContextAdapter = null;
- Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
- for (Class clazz : innerClazz) {
- int mod = clazz.getModifiers();
- if (Modifier.isPrivate(mod)) {
- if (className.equalsIgnoreCase(clazz.getSimpleName())) {
- runtimeContextDeserializationInitializationContextAdapter = clazz;
- break;
- }
- }
- }
- if (runtimeContextDeserializationInitializationContextAdapter != null) {
- Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName);
- field.setAccessible(true);
- RuntimeContext runtimeContext = (RuntimeContext) field.get(context);
- metricGroup = runtimeContext.getMetricGroup();
- } else {
- metricGroup = context.getMetricGroup();
- }
- return metricGroup;
}
@Override
@@ -159,6 +109,10 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
return false;
}
+ public void setMetricData(SourceMetricData metricData) {
+ this.sourceMetricData = metricData;
+ }
+
@Override
public RowData deserialize(Message<RowData> message) throws IOException {
final SimpleCollector<RowData> collector = tlsCollector.get();
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index 22993e6fc..a536fe233 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -320,7 +320,9 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
serviceUrl,
clientConfigurationData,
deserializationSchema,
- properties
+ properties,
+ inlongMetric,
+ auditHostAndPorts
);
if (watermarkStrategy != null) {
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
index 1db76eeb0..9cc8d11fe 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.pulsar.withoutadmin;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
@@ -47,7 +46,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
@@ -60,6 +58,12 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -87,6 +91,9 @@ import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.Puls
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
/**
* Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9,
@@ -223,11 +230,31 @@ public class FlinkPulsarSource<T>
private transient int numParallelTasks;
+
+ private MetricState metricState;
+
+ /**
+ * Metric for InLong
+ */
+ private String inlongMetric;
+ /**
+ * audit host and ports
+ */
+ private String inlongAudit;
+
+ private SourceMetricData sourceMetricData;
+
+ private transient ListState<MetricState> metricStateListState;
+
public FlinkPulsarSource(
String serverUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
- Properties properties) {
+ Properties properties,
+ String inlongMetric,
+ String inlongAudit) {
+ this.inlongAudit = inlongAudit;
+ this.inlongMetric = inlongMetric;
this.serverUrl = checkNotNull(serverUrl);
this.clientConfigurationData = checkNotNull(clientConf);
this.deserializer = deserializer;
@@ -253,14 +280,6 @@ public class FlinkPulsarSource<T>
this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
}
- public FlinkPulsarSource(
- String serviceUrl,
- DeserializationSchema<T> deserializer,
- Properties properties) {
- this(serviceUrl, PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties),
- PulsarDeserializationSchema.valueOnly(deserializer), properties);
- }
-
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
@@ -408,14 +427,34 @@ public class FlinkPulsarSource<T>
@Override
public void open(Configuration parameters) throws Exception {
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+ .build();
+
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
+
if (this.deserializer != null) {
+
+ DynamicPulsarDeserializationSchema dynamicKafkaDeserializationSchema =
+ (DynamicPulsarDeserializationSchema) deserializer;
+ dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
+
this.deserializer.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext(),
metricGroup -> metricGroup.addGroup("user")
)
);
+
}
+
this.taskIndex = getRuntimeContext().getIndexOfThisSubtask();
this.numParallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
@@ -708,10 +747,21 @@ public class FlinkPulsarSource<T>
createStateSerializer()
));
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+
if (context.isRestored()) {
restoredState = new TreeMap<>();
Iterator<Tuple2<TopicSubscription, MessageId>> iterator = unionOffsetStates.get().iterator();
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
if (!iterator.hasNext()) {
iterator = tryMigrateState(stateStore);
}
@@ -809,6 +859,12 @@ public class FlinkPulsarSource<T>
if (!running) {
log.debug("snapshotState() called on closed source");
} else {
+
+ if (sourceMetricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+
unionOffsetStates.clear();
PulsarFetcher<T> fetcher = this.pulsarFetcher;