You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 10:57:24 UTC
[inlong] 02/02: [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 9afd220eb07f5f4f9e1c1df38cac85ed57ad3ee4
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:47:36 2022 +0800
[INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
---
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 30 ++++++++++++++++++++--
1 file changed, 28 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index a1e9641d2..10df3d14f 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
/**
* The sink function for HBase.
@@ -86,6 +95,9 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
* </p>
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+ private SinkMetricData sinkMetricData;
private transient Connection connection;
private transient BufferedMutator mutator;
private transient ScheduledExecutorService executor;
@@ -93,7 +105,6 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
private transient AtomicLong numPendingRequests;
private transient RuntimeContext runtimeContext;
private transient volatile boolean closed = false;
- private SinkMetricData sinkMetricData;
private Long dataSize = 0L;
private Long rowSize = 0L;
@@ -126,6 +137,8 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withInlongAudit(inlongAudit)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -290,11 +303,24 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
while (numPendingRequests.get() != 0) {
flush();
}
+ if (sinkMetricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- // nothing to do.
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
}
@Override