You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 10:57:22 UTC

[inlong] branch release-1.3.0 updated (58cfcd29c -> 9afd220eb)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 58cfcd29c [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951)
     new d32537802 [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
     new 9afd220eb [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../filesystem/stream/AbstractStreamingWriter.java | 37 +++++++++++++++++++---
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 30 ++++++++++++++++--
 2 files changed, 61 insertions(+), 6 deletions(-)


[inlong] 01/02: [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d32537802972d454a81808994947f632b611441d
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:42:03 2022 +0800

    [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
---
 .../filesystem/stream/AbstractStreamingWriter.java | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 95267c698..9edcc82b2 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.filesystem.stream;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
             IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
             bucketsBuilder;
 
-    private String inlongMetric;
+    private final String inlongMetric;
+    private final String inlongAudit;
 
-    private String inlongAudit;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+    private SinkMetricData sinkMetricData;
 
     // --------------------------- runtime fields -----------------------------
 
@@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
         super.open();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withRegisterMetric(RegisteredMetric.ALL)
                 .withInlongAudit(inlongAudit)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
@@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         helper.snapshotState(context.getCheckpointId());
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override


[inlong] 02/02: [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9afd220eb07f5f4f9e1c1df38cac85ed57ad3ee4
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:47:36 2022 +0800

    [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
---
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 30 ++++++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index a1e9641d2..10df3d14f 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * The sink function for HBase.
@@ -86,6 +95,9 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
      * </p>
      */
     private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+    private SinkMetricData sinkMetricData;
     private transient Connection connection;
     private transient BufferedMutator mutator;
     private transient ScheduledExecutorService executor;
@@ -93,7 +105,6 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
     private transient AtomicLong numPendingRequests;
     private transient RuntimeContext runtimeContext;
     private transient volatile boolean closed = false;
-    private SinkMetricData sinkMetricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
@@ -126,6 +137,8 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
             MetricOption metricOption = MetricOption.builder()
                     .withInlongLabels(inlongMetric)
                     .withInlongAudit(inlongAudit)
+                    .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                    .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                     .withRegisterMetric(RegisteredMetric.ALL)
                     .build();
             if (metricOption != null) {
@@ -290,11 +303,24 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
         while (numPendingRequests.get() != 0) {
             flush();
         }
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
-        // nothing to do.
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     @Override