You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/28 03:20:07 UTC
[inlong] branch master updated: [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 704ef3eb7 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
704ef3eb7 is described below
commit 704ef3eb7440b1ad3a41ab851203f56c1fe650e3
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Fri Apr 28 11:20:01 2023 +0800
[INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
---
.../inlong/sort/base/metric/SinkMetricData.java | 8 ++--
.../inlong/sort/base/metric/SourceMetricData.java | 14 +++----
.../sort/base/metric/sub/SinkTableMetricData.java | 7 ++--
.../sort/base/util/CalculateObjectSizeUtils.java | 47 ++++++++++++++++++++++
.../base/util/CalculateObjectSizeUtilsTest.java | 47 ++++++++++++++++++++++
.../table/DorisDynamicSchemaOutputFormat.java | 9 +++--
.../filesystem/stream/AbstractStreamingWriter.java | 7 ++--
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 5 ++-
.../hive/filesystem/AbstractStreamingWriter.java | 4 +-
.../sink/multiple/DynamicSchemaHandleOperator.java | 4 +-
.../sink/multiple/IcebergMultipleStreamWriter.java | 4 +-
.../jdbc/internal/JdbcBatchingOutputFormat.java | 4 +-
.../internal/TableMetricStatementExecutor.java | 9 +++--
13 files changed, 130 insertions(+), 39 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 3e0ae04b3..a130be24a 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,7 +25,6 @@ import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -36,6 +35,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
/**
* A collection class for handling metrics
@@ -253,13 +253,11 @@ public class SinkMetricData implements MetricData {
}
public void invokeWithEstimate(Object o) {
- long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
- invoke(1, size);
+ invoke(1, getDataSize(o));
}
public void invokeDirtyWithEstimate(Object o) {
- long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
- invokeDirty(1, size);
+ invokeDirty(1, getDataSize(o));
}
public void invoke(long rowCount, long rowSize) {
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e5ffdf844..b5ca46713 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -17,24 +17,24 @@
package org.apache.inlong.sort.base.metric;
-import java.util.List;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditOperator;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
/**
* A collection class for handling metrics
@@ -194,13 +194,11 @@ public class SourceMetricData implements MetricData {
}
public void outputMetricsWithEstimate(Object data) {
- long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
- outputMetrics(1, size);
+ outputMetrics(1, getDataSize(data));
}
public void outputMetricsWithEstimate(Object data, long dataTime) {
- long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
- outputMetrics(1, size, dataTime);
+ outputMetrics(1, getDataSize(data), dataTime);
}
public void outputMetrics(long rowCountSize, long rowDataSize) {
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index 92f9da48f..537bab033 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -38,6 +38,7 @@ import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
/**
* A collection class for handling sub metrics of table schema type
@@ -405,13 +406,11 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
* @param data the dirty data
*/
public void outputDirtyMetricsWithEstimate(String database, String table, Object data) {
- long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length;
- outputDirtyMetrics(database, table, 1, size);
+ outputDirtyMetrics(database, table, 1, getDataSize(data));
}
public void outputDirtyMetricsWithEstimate(Object data) {
- long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
- invokeDirty(1, size);
+ invokeDirty(1, getDataSize(data));
}
@Override
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
new file mode 100644
index 000000000..0826eb2a9
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * calculate tool for object
+ */
+public class CalculateObjectSizeUtils {
+
+ /**
+ * {@link BinaryRowData} don't implement the {@link Object#toString} method
+ * So, we need use {@link BinaryRowData#getSizeInBytes} to get byte size.
+ */
+ public static long getDataSize(Object object) {
+ if (object == null) {
+ return 0L;
+ }
+ long size;
+ if (object instanceof BinaryRowData) {
+ BinaryRowData binaryRowData = (BinaryRowData) object;
+ size = binaryRowData.getSizeInBytes();
+ } else {
+ size = object.toString().getBytes(StandardCharsets.UTF_8).length;
+ }
+ return size;
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java
new file mode 100644
index 000000000..cc6b85ebb
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link CalculateObjectSizeUtils}
+ */
+public class CalculateObjectSizeUtilsTest {
+
+ @Test
+ public void testGetDataSize() {
+ String data1 = null;
+ long expected1 = 0L;
+ long actual1 = CalculateObjectSizeUtils.getDataSize(data1);
+ Assert.assertEquals(expected1, actual1);
+
+ String data2 = "test";
+ long expected2 = 4L;
+ long actual2 = CalculateObjectSizeUtils.getDataSize(data2);
+ Assert.assertEquals(expected2, actual2);
+
+ BinaryRowData data3 = BinaryRowDataUtil.EMPTY_ROW;
+ long expected3 = 8L;
+ long actual3 = CalculateObjectSizeUtils.getDataSize(data3);
+ Assert.assertEquals(expected3, actual3);
+ }
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index db446f0ce..02f364b4d 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -17,8 +17,6 @@
package org.apache.inlong.sort.doris.table;
-import java.util.LinkedHashSet;
-import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -55,6 +53,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.doris.model.RespContent;
import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -68,10 +67,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -579,9 +580,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
try {
metricData.outputDirtyMetricsWithEstimate(database, table, 1,
- content.getBytes(StandardCharsets.UTF_8).length);
+ CalculateObjectSizeUtils.getDataSize(content));
} catch (Exception ex) {
- metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+ metricData.invokeDirtyWithEstimate(dirtyData);
}
}
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 3d8531bd4..4cef0c5b8 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -236,7 +236,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
element.hasTimestamp() ? element.getTimestamp() : null,
currentWatermark);
rowSize = rowSize + 1;
- dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+ dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
@@ -245,8 +245,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
throw new RuntimeException(e);
}
if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1L,
- element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+ sinkMetricData.invokeWithEstimate(element.getValue());
}
if (dirtySink != null) {
DirtyData.Builder<Object> builder = DirtyData.builder();
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 1d62d8f56..be01bf403 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -52,6 +52,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -249,13 +250,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
try {
mutation = Preconditions.checkNotNull(mutationConverter.convertToMutation(value));
rowSize++;
- dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length;
+ dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(value);
} catch (Exception e) {
LOGGER.error("Convert to mutation error", e);
if (!dirtyOptions.ignoreDirty()) {
throw new RuntimeException(e);
}
- sinkMetricData.invokeDirty(1, value.toString().getBytes(StandardCharsets.UTF_8).length);
+ sinkMetricData.invokeDirtyWithEstimate(value);
if (dirtySink != null) {
DirtyData.Builder<Object> builder = DirtyData.builder();
try {
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 4a359ad7e..d2603b90a 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -241,7 +241,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
currentWatermark);
rowSize = rowSize + 1;
if (element.getValue() != null) {
- dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+ dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
}
} catch (IOException e) {
throw e;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 472944aae..6194b891d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,7 +229,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
if (!dirtyOptions.ignoreDirty()) {
if (metricData != null) {
metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(),
- tableId.name(), rowData.toString());
+ tableId.name(), rowData);
}
} else {
handleDirtyData(rowData.toString(), jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId,
@@ -384,7 +384,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
if (metricData != null) {
metricData.outputDirtyMetricsWithEstimate(
tableId.namespace().toString(), tableId.name(),
- rowData.toString());
+ rowData);
}
} else {
handleDirtyData(rowData.toString(), jsonNode,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 3e29f20fc..32c4200ad 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -49,6 +49,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
import org.slf4j.Logger;
@@ -56,7 +57,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -258,7 +258,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
for (RowData data : recordWithSchema.getData()) {
String dataBaseName = tableId.namespace().toString();
String tableName = tableId.name();
- long size = data == null ? 0 : data.toString().getBytes(StandardCharsets.UTF_8).length;
+ long size = CalculateObjectSizeUtils.getDataSize(data);
try {
multipleWriters.get(tableId).processElement(data);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index f7c2f74e5..307c71ddc 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -54,6 +54,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +64,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.concurrent.Executors;
@@ -309,7 +309,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
private void updateMetric(In record) {
rowSize++;
- dataSize += record.toString().getBytes(StandardCharsets.UTF_8).length;
+ dataSize += CalculateObjectSizeUtils.getDataSize(record);
}
private void resetStateAfterFlush() {
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
index c5ae7cc65..4f8a1ec97 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +114,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
// approximate since it may be inefficient to iterate over all writtenSize-1 elements.
long writtenBytes = 0L;
if (writtenSize > 0) {
- writtenBytes = (long) batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+ writtenBytes = CalculateObjectSizeUtils.getDataSize(batch.get(0)) * writtenSize;
}
batch.clear();
if (!multipleSink) {
@@ -181,7 +182,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
st.addBatch();
st.executeBatch();
if (!multipleSink) {
- sinkMetricData.invoke(1, rowData.toString().getBytes().length);
+ sinkMetricData.invokeWithEstimate(rowData);
} else {
metric[0] += 1;
metric[1] += rowData.toString().getBytes().length;
@@ -200,13 +201,13 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
if (dirtySinkHelper != null) {
dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, e);
}
- sinkMetricData.invokeDirty(1, rowData.toString().getBytes().length);
+ sinkMetricData.invokeDirtyWithEstimate(rowData);
} else {
if (dirtySinkHelper != null) {
dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e);
}
metric[2] += 1;
- metric[3] += rowData.toString().getBytes().length;
+ metric[3] += CalculateObjectSizeUtils.getDataSize(rowData);
}
}