You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by pa...@apache.org on 2023/01/04 09:01:38 UTC
[inlong] branch master updated: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e305ae869 [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
e305ae869 is described below
commit e305ae8692832f592fde722c9143f0a13580ac8c
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Jan 4 17:01:31 2023 +0800
[INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
---
.../sort/base/metric/sub/SinkTableMetricData.java | 13 ++++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 2 +-
.../sink/multiple/DynamicSchemaHandleOperator.java | 84 ++++++++++++++++++++--
.../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-------------
.../sink/multiple/IcebergSingleStreamWriter.java | 10 ++-
5 files changed, 110 insertions(+), 59 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index a5690a5b5..842584ba7 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
subSinkMetricData.invokeDirty(rowCount, rowSize);
}
+ /**
+ * output dirty metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param data the dirty data
+ */
+ public void outputDirtyMetricsWithEstimate(String database, String schema, String table, Object data) {
+ long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length;
+ outputDirtyMetrics(database, schema, table, 1, size);
+ }
+
public void outputDirtyMetricsWithEstimate(Object data) {
long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
invokeDirty(1, size);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a5f070522..b0ed02abe 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -548,7 +548,7 @@ public class FlinkSink {
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
- catalogLoader, multipleSinkOption, dirtyOptions, dirtySink);
+ catalogLoader, multipleSinkOption, dirtyOptions, dirtySink, inlongMetric, auditHostAndPorts);
SingleOutputStreamOperator<RecordWithSchema> routeStream = input
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
TypeInformation.of(RecordWithSchema.class),
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 37bb6f944..53fef8dd1 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -17,6 +17,12 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.base.sink.TableChange;
import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +72,12 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
implements
OneInputStreamOperator<RowData, RecordWithSchema>,
@@ -90,13 +107,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ // metric
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private @Nullable transient SinkTableMetricData metricData;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+
public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink, String inlongMetric, String auditHostAndPorts) {
this.catalogLoader = catalogLoader;
this.multipleSinkOption = multipleSinkOption;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
@SuppressWarnings("unchecked")
@@ -117,6 +143,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
this.recordQueues = new HashMap<>();
this.schemaCache = new HashMap<>();
this.blacklist = new HashSet<>();
+
+ // Initialize metric
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
}
@Override
@@ -136,14 +176,15 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
LOGGER.error(String.format("Deserialize error, raw data: %s",
new String(element.getValue().getBinary(0))), e);
handleDirtyData(new String(element.getValue().getBinary(0)),
- null, DirtyType.DESERIALIZE_ERROR, e);
+ null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow"));
}
TableIdentifier tableId = null;
try {
tableId = parseId(jsonNode);
} catch (Exception e) {
LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e);
- handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e);
+ handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
+ e, TableIdentifier.of("unknow", "unknow"));
}
if (blacklist.contains(tableId)) {
return;
@@ -156,7 +197,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
}
}
- private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e) {
+ private void handleDirtyData(Object dirtyData,
+ JsonNode rootNode,
+ DirtyType dirtyType,
+ Exception e,
+ TableIdentifier tableId) {
if (!dirtyOptions.ignoreDirty()) {
RuntimeException ex;
if (e instanceof RuntimeException) {
@@ -182,6 +227,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
.setIdentifier(dirtyOptions.getIdentifier());
}
dirtySink.invoke(builder.build());
+ if (metricData != null) {
+ metricData.outputDirtyMetricsWithEstimate(
+ tableId.namespace().toString(), null, tableId.name(), dirtyData);
+ }
} catch (Exception ex) {
if (!dirtyOptions.ignoreSideOutputErrors()) {
throw new RuntimeException(ex);
@@ -198,6 +247,29 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
}
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ if (metricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ // init metric state
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ String.format(INLONG_METRIC_STATE_NAME), TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+ }
+
private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
// todo:parse ddl sql
}
@@ -242,7 +314,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
LOG.warn("Ignore table {} schema change, old: {} new: {}.",
tableId, dataSchema, latestSchema, e);
blacklist.add(tableId);
- handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e);
+ handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
}
return Collections.emptyList();
});
@@ -329,7 +401,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
tableId,
pkListStr);
} catch (Exception e) {
- handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e);
+ handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId);
}
return null;
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 1abd5b97a..3022ec6c0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -17,10 +17,6 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -38,13 +34,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
import org.slf4j.Logger;
@@ -65,9 +57,7 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* Iceberg writer that can distinguish different sink tables and route and distribute data into different
@@ -93,10 +83,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
// metric
private final String inlongMetric;
private final String auditHostAndPorts;
- @Nullable
- private transient SinkMetricData metricData;
- private transient ListState<MetricState> metricStateListState;
- private transient MetricState metricState;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
@@ -123,18 +109,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
this.multipleWriters = new HashMap<>();
this.multipleTables = new HashMap<>();
this.multipleSchemas = new HashMap<>();
-
- // Initialize metric
- MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
- .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
- .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
- .withRegisterMetric(RegisteredMetric.ALL)
- .build();
- if (metricOption != null) {
- metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
- }
}
@Override
@@ -202,9 +176,14 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
appendMode);
if (multipleWriters.get(tableId) == null) {
+ StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric);
+ subWriterInlongMetric.append(DELIMITER)
+ .append(Constants.DATABASE_NAME).append("=").append(tableId.namespace().toString())
+ .append(DELIMITER)
+ .append(Constants.TABLE_NAME).append("=").append(tableId.name());
IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
- tableId.toString(), taskWriterFactory, null,
- null, flinkRowType, dirtyOptions, dirtySink);
+ tableId.toString(), taskWriterFactory, subWriterInlongMetric.toString(),
+ auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink);
writer.setup(getRuntimeContext(),
new CallbackCollector<>(
writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))),
@@ -223,9 +202,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
if (multipleWriters.get(tableId) != null) {
for (RowData data : recordWithSchema.getData()) {
multipleWriters.get(tableId).processElement(data);
- if (metricData != null) {
- metricData.invokeWithEstimate(data);
- }
}
} else {
LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
@@ -244,29 +220,11 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry : multipleWriters.entrySet()) {
entry.getValue().snapshotState(context);
}
-
- // metric
- if (metricData != null && metricStateListState != null) {
- MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
- getRuntimeContext().getIndexOfThisSubtask());
- }
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.functionInitializationContext = context;
-
- // init metric state
- if (this.inlongMetric != null) {
- this.metricStateListState = context.getOperatorStateStore().getUnionListState(
- new ListStateDescriptor<>(
- INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
- })));
- }
- if (context.isRestored()) {
- metricState = MetricStateUtils.restoreMetricState(metricStateListState,
- getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
- }
}
private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index c8bfbeb08..dc30c5b21 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -105,6 +107,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
.withInlongAudit(auditHostAndPorts)
.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -138,6 +142,9 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
.setRowType(flinkRowType)
.setDirtyMessage(e.getMessage());
dirtySink.invoke(builder.build());
+ if (metricData != null) {
+ metricData.invokeDirtyWithEstimate(value);
+ }
} catch (Exception ex) {
if (!dirtyOptions.ignoreSideOutputErrors()) {
throw new RuntimeException(ex);
@@ -157,7 +164,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
if (this.inlongMetric != null) {
this.metricStateListState = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>(
- INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ String.format("Iceberg(%s)-" + INLONG_METRIC_STATE_NAME, fullTableName),
+ TypeInformation.of(new TypeHint<MetricState>() {
})));
}
if (context.isRestored()) {