You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 09:25:46 UTC

[inlong] branch release-1.3.0 updated (86a53e1e2 -> f92c1839d)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
     new 632a0261b [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
     new 4109c08b7 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)
     new 9f05ea51e [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)
     new 5417acad8 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
     new f92c1839d [INLONG-5943][Sort] Add metric state for JDBC  (#5966)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/sort/base/metric/MetricOption.java      |  8 +++
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  3 +
 .../elasticsearch/ElasticsearchSinkFunction.java   | 10 +++
 .../table/RowElasticsearchSinkFunction.java        | 38 ++++++++++
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 ++++++++
 .../iceberg/flink/sink/IcebergStreamWriter.java    | 41 +++++++++++
 .../sort/iceberg/sink/IcebergStreamWriter.java     | 41 +++++++++++
 .../jdbc/internal/AbstractJdbcOutputFormat.java    | 83 ++++++++++++++++++++++
 .../jdbc/internal/GenericJdbcSinkFunction.java     |  5 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    | 40 ++++++++++-
 10 files changed, 297 insertions(+), 3 deletions(-)
 create mode 100644 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java


[inlong] 03/05: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9f05ea51e04775a543f358bb4f34c16ee6c48331
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:05:23 2022 +0800

    [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../inlong/sort/base/metric/MetricOption.java      |  8 +++++
 .../iceberg/flink/sink/IcebergStreamWriter.java    | 41 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index f4c679f9c..8cf0d6f01 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -111,6 +111,14 @@ public class MetricOption {
         return initBytes;
     }
 
+    public void setInitRecords(long initRecords) {
+        this.initRecords = initRecords;
+    }
+
+    public void setInitBytes(long initBytes) {
+        this.initBytes = initBytes;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index bb00b7808..ef7612743 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.flink.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -51,6 +63,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     private transient int attemptId;
     @Nullable
     private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
 
     IcebergStreamWriter(String fullTableName,
             TaskWriterFactory<T> taskWriterFactory,
@@ -74,6 +88,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
 
         // Initialize metric
         if (metricOption != null) {
+            metricOption.setInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L);
+            metricOption.setInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L);
             metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
@@ -95,6 +111,31 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         }
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        // init metric state
+        if (this.metricData != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
     @Override
     public void close() throws Exception {
         super.close();


[inlong] 01/05: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 632a0261b0629692430b1d9d7e42a7385910e6e3
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:02:06 2022 +0800

    [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 ++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index ab2e845f2..dd9456203 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.hive.filesystem;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
 import javax.annotation.Nullable;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
 
     @Nullable
     private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
 
     public AbstractStreamingWriter(
             long bucketCheckInterval,
@@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
+
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         helper.snapshotState(context.getCheckpointId());
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override


[inlong] 04/05: [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5417acad8749a6cb8d88a51ec6fa28df9e382f80
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Sep 21 16:44:49 2022 +0800

    [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
---
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  3 ++
 .../elasticsearch/ElasticsearchSinkFunction.java   | 10 ++++++
 .../table/RowElasticsearchSinkFunction.java        | 38 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index cdea07fa5..dff8b9335 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         // no initialization needed
+        elasticsearchSinkFunction.initializeState(context);
     }
 
     @Override
@@ -305,6 +306,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
                 checkAsyncErrorsAndRequests();
             }
         }
+
+        elasticsearchSinkFunction.snapshotState(context);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
index dc4bf5af1..e1c019918 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -21,6 +21,8 @@ package org.apache.inlong.sort.elasticsearch;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.elasticsearch.action.ActionRequest;
 
@@ -59,4 +61,12 @@ public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
      * @param indexer request indexer that {@code ActionRequest} should be added to
      */
     void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+
+    default void initializeState(FunctionInitializationContext context) throws Exception {
+        // no initialization needed
+    }
+
+    default void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+    }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index e4f1419c5..a5abc4690 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,16 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.table.api.TableException;
@@ -40,6 +48,10 @@ import javax.annotation.Nullable;
 import java.util.Objects;
 import java.util.function.Function;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
 public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
 
@@ -51,6 +63,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private final XContentType contentType;
     private final RequestFactory requestFactory;
     private final Function<RowData, String> createKey;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
     private final String inlongMetric;
     private final String auditHostAndPorts;
 
@@ -88,6 +102,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.NORMAL)
                 .build();
         if (metricOption != null) {
@@ -101,6 +117,28 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         }
     }
 
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks());
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    runtimeContext.getIndexOfThisSubtask());
+        }
+    }
+
     @Override
     public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
         switch (element.getRowKind()) {


[inlong] 02/05: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4109c08b7bb9aefd0583a1410944a47c3bfc8c6d
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:04:01 2022 +0800

    [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../sort/iceberg/sink/IcebergStreamWriter.java     | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 8318c7177..6f1b75ea0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -53,6 +65,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     private transient int attemptId;
     @Nullable
     private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
 
     IcebergStreamWriter(
             String fullTableName,
@@ -81,6 +95,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -105,6 +121,31 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         }
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
     @Override
     public void dispose() throws Exception {
         super.dispose();


[inlong] 05/05: [INLONG-5943][Sort] Add metric state for JDBC (#5966)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f92c1839d583b8efad66bea3b7608b347f3aa683
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Sep 21 16:46:04 2022 +0800

    [INLONG-5943][Sort] Add metric state for JDBC  (#5966)
---
 .../jdbc/internal/AbstractJdbcOutputFormat.java    | 83 ++++++++++++++++++++++
 .../jdbc/internal/GenericJdbcSinkFunction.java     |  5 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    | 40 ++++++++++-
 3 files changed, 125 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
new file mode 100644
index 000000000..e45537d7e
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.sql.Connection;
+
+/** Base jdbc outputFormat. */
+public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
+
+    private static final long serialVersionUID = 1L;
+    public static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
+    public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+            org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class);
+    protected final JdbcConnectionProvider connectionProvider;
+
+    public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) {
+        this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        try {
+            connectionProvider.getOrEstablishConnection();
+        } catch (Exception e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        connectionProvider.closeConnection();
+    }
+
+    @Override
+    public void flush() throws IOException {
+
+    }
+
+    @VisibleForTesting
+    public Connection getConnection() {
+        return connectionProvider.getConnection();
+    }
+
+    abstract void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+    abstract void initializeState(FunctionInitializationContext context) throws Exception;
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
index 717ed3cd0..0afd5fb24 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -60,12 +59,14 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
     }
 
     @Override
-    public void initializeState(FunctionInitializationContext context) {
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+            outputFormat.initializeState(context);
     }
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws Exception {
         outputFormat.flush();
+        outputFormat.snapshotState(context);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index e32d4094b..33346d686 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -19,10 +19,13 @@
 package org.apache.inlong.sort.jdbc.internal;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
-import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
@@ -30,12 +33,16 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
 import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,8 +57,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * A JDBC outputFormat that supports batching records before writing records to database.
@@ -76,6 +87,8 @@ public class JdbcBatchingOutputFormat<
     private transient volatile Exception flushException;
     private transient RuntimeContext runtimeContext;
 
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
     private SinkMetricData sinkMetricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
@@ -127,6 +140,8 @@ public class JdbcBatchingOutputFormat<
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -216,6 +231,29 @@ public class JdbcBatchingOutputFormat<
         jdbcStatementExecutor.addToBatch(extracted);
     }
 
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (sinkMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
     @Override
     public synchronized void flush() throws IOException {
         checkFlushException();