You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/20 09:47:41 UTC

[inlong] branch master updated: [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0799b87ae [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
0799b87ae is described below

commit 0799b87aea4df45bf19fd8bad985b318437a7a61
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:47:36 2022 +0800

    [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
---
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 30 ++++++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index a1e9641d2..10df3d14f 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * The sink function for HBase.
@@ -86,6 +95,9 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
      * </p>
      */
     private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+    private SinkMetricData sinkMetricData;
     private transient Connection connection;
     private transient BufferedMutator mutator;
     private transient ScheduledExecutorService executor;
@@ -93,7 +105,6 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
     private transient AtomicLong numPendingRequests;
     private transient RuntimeContext runtimeContext;
     private transient volatile boolean closed = false;
-    private SinkMetricData sinkMetricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
@@ -126,6 +137,8 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
             MetricOption metricOption = MetricOption.builder()
                     .withInlongLabels(inlongMetric)
                     .withInlongAudit(inlongAudit)
+                    .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                    .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                     .withRegisterMetric(RegisteredMetric.ALL)
                     .build();
             if (metricOption != null) {
@@ -290,11 +303,24 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
         while (numPendingRequests.get() != 0) {
             flush();
         }
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
-        // nothing to do.
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     @Override