You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/28 03:20:07 UTC

[inlong] branch master updated: [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 704ef3eb7 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
704ef3eb7 is described below

commit 704ef3eb7440b1ad3a41ab851203f56c1fe650e3
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Fri Apr 28 11:20:01 2023 +0800

    [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
---
 .../inlong/sort/base/metric/SinkMetricData.java    |  8 ++--
 .../inlong/sort/base/metric/SourceMetricData.java  | 14 +++----
 .../sort/base/metric/sub/SinkTableMetricData.java  |  7 ++--
 .../sort/base/util/CalculateObjectSizeUtils.java   | 47 ++++++++++++++++++++++
 .../base/util/CalculateObjectSizeUtilsTest.java    | 47 ++++++++++++++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      |  9 +++--
 .../filesystem/stream/AbstractStreamingWriter.java |  7 ++--
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |  5 ++-
 .../hive/filesystem/AbstractStreamingWriter.java   |  4 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java |  4 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java |  4 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  4 +-
 .../internal/TableMetricStatementExecutor.java     |  9 +++--
 13 files changed, 130 insertions(+), 39 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 3e0ae04b3..a130be24a 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,7 +25,6 @@ import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -36,6 +35,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
 
 /**
  * A collection class for handling metrics
@@ -253,13 +253,11 @@ public class SinkMetricData implements MetricData {
     }
 
     public void invokeWithEstimate(Object o) {
-        long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        invoke(1, size);
+        invoke(1, getDataSize(o));
     }
 
     public void invokeDirtyWithEstimate(Object o) {
-        long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        invokeDirty(1, size);
+        invokeDirty(1, getDataSize(o));
     }
 
     public void invoke(long rowCount, long rowSize) {
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e5ffdf844..b5ca46713 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -17,24 +17,24 @@
 
 package org.apache.inlong.sort.base.metric;
 
-import java.util.List;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditOperator;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
+
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
 
 /**
  * A collection class for handling metrics
@@ -194,13 +194,11 @@ public class SourceMetricData implements MetricData {
     }
 
     public void outputMetricsWithEstimate(Object data) {
-        long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
-        outputMetrics(1, size);
+        outputMetrics(1, getDataSize(data));
     }
 
     public void outputMetricsWithEstimate(Object data, long dataTime) {
-        long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
-        outputMetrics(1, size, dataTime);
+        outputMetrics(1, getDataSize(data), dataTime);
     }
 
     public void outputMetrics(long rowCountSize, long rowDataSize) {
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index 92f9da48f..537bab033 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -38,6 +38,7 @@ import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
 
 /**
  * A collection class for handling sub metrics of table schema type
@@ -405,13 +406,11 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
      * @param data the dirty data
      */
     public void outputDirtyMetricsWithEstimate(String database, String table, Object data) {
-        long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length;
-        outputDirtyMetrics(database, table, 1, size);
+        outputDirtyMetrics(database, table, 1, getDataSize(data));
     }
 
     public void outputDirtyMetricsWithEstimate(Object data) {
-        long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
-        invokeDirty(1, size);
+        invokeDirty(1, getDataSize(data));
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
new file mode 100644
index 000000000..0826eb2a9
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * calculate tool for object
+ */
+public class CalculateObjectSizeUtils {
+
+    /**
+     * {@link BinaryRowData} don't implement the {@link Object#toString} method
+     * So, we need use {@link BinaryRowData#getSizeInBytes} to get byte size.
+     */
+    public static long getDataSize(Object object) {
+        if (object == null) {
+            return 0L;
+        }
+        long size;
+        if (object instanceof BinaryRowData) {
+            BinaryRowData binaryRowData = (BinaryRowData) object;
+            size = binaryRowData.getSizeInBytes();
+        } else {
+            size = object.toString().getBytes(StandardCharsets.UTF_8).length;
+        }
+        return size;
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java
new file mode 100644
index 000000000..cc6b85ebb
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link CalculateObjectSizeUtils}
+ */
+public class CalculateObjectSizeUtilsTest {
+
+    @Test
+    public void testGetDataSize() {
+        String data1 = null;
+        long expected1 = 0L;
+        long actual1 = CalculateObjectSizeUtils.getDataSize(data1);
+        Assert.assertEquals(expected1, actual1);
+
+        String data2 = "test";
+        long expected2 = 4L;
+        long actual2 = CalculateObjectSizeUtils.getDataSize(data2);
+        Assert.assertEquals(expected2, actual2);
+
+        BinaryRowData data3 = BinaryRowDataUtil.EMPTY_ROW;
+        long expected3 = 8L;
+        long actual3 = CalculateObjectSizeUtils.getDataSize(data3);
+        Assert.assertEquals(expected3, actual3);
+    }
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index db446f0ce..02f364b4d 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.sort.doris.table;
 
-import java.util.LinkedHashSet;
-import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -55,6 +53,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.doris.model.RespContent;
 import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -68,10 +67,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -579,9 +580,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
 
         try {
             metricData.outputDirtyMetricsWithEstimate(database, table, 1,
-                    content.getBytes(StandardCharsets.UTF_8).length);
+                    CalculateObjectSizeUtils.getDataSize(content));
         } catch (Exception ex) {
-            metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+            metricData.invokeDirtyWithEstimate(dirtyData);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 3d8531bd4..4cef0c5b8 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -236,7 +236,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                     element.hasTimestamp() ? element.getTimestamp() : null,
                     currentWatermark);
             rowSize = rowSize + 1;
-            dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+            dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
         } catch (IOException e) {
             throw new RuntimeException(e);
         } catch (Exception e) {
@@ -245,8 +245,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                 throw new RuntimeException(e);
             }
             if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(1L,
-                        element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+                sinkMetricData.invokeWithEstimate(element.getValue());
             }
             if (dirtySink != null) {
                 DirtyData.Builder<Object> builder = DirtyData.builder();
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 1d62d8f56..be01bf403 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -52,6 +52,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -249,13 +250,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
             try {
                 mutation = Preconditions.checkNotNull(mutationConverter.convertToMutation(value));
                 rowSize++;
-                dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length;
+                dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(value);
             } catch (Exception e) {
                 LOGGER.error("Convert to mutation error", e);
                 if (!dirtyOptions.ignoreDirty()) {
                     throw new RuntimeException(e);
                 }
-                sinkMetricData.invokeDirty(1, value.toString().getBytes(StandardCharsets.UTF_8).length);
+                sinkMetricData.invokeDirtyWithEstimate(value);
                 if (dirtySink != null) {
                     DirtyData.Builder<Object> builder = DirtyData.builder();
                     try {
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 4a359ad7e..d2603b90a 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -241,7 +241,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
                     currentWatermark);
             rowSize = rowSize + 1;
             if (element.getValue() != null) {
-                dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+                dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
             }
         } catch (IOException e) {
             throw e;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 472944aae..6194b891d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,7 +229,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
             if (!dirtyOptions.ignoreDirty()) {
                 if (metricData != null) {
                     metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(),
-                            tableId.name(), rowData.toString());
+                            tableId.name(), rowData);
                 }
             } else {
                 handleDirtyData(rowData.toString(), jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId,
@@ -384,7 +384,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                                             if (metricData != null) {
                                                 metricData.outputDirtyMetricsWithEstimate(
                                                         tableId.namespace().toString(), tableId.name(),
-                                                        rowData.toString());
+                                                        rowData);
                                             }
                                         } else {
                                             handleDirtyData(rowData.toString(), jsonNode,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 3e29f20fc..32c4200ad 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -49,6 +49,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
@@ -56,7 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -258,7 +258,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
                 for (RowData data : recordWithSchema.getData()) {
                     String dataBaseName = tableId.namespace().toString();
                     String tableName = tableId.name();
-                    long size = data == null ? 0 : data.toString().getBytes(StandardCharsets.UTF_8).length;
+                    long size = CalculateObjectSizeUtils.getDataSize(data);
 
                     try {
                         multipleWriters.get(tableId).processElement(data);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index f7c2f74e5..307c71ddc 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -54,6 +54,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +64,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.concurrent.Executors;
@@ -309,7 +309,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 
     private void updateMetric(In record) {
         rowSize++;
-        dataSize += record.toString().getBytes(StandardCharsets.UTF_8).length;
+        dataSize += CalculateObjectSizeUtils.getDataSize(record);
     }
 
     private void resetStateAfterFlush() {
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
index c5ae7cc65..4f8a1ec97 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +114,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
             // approximate since it may be inefficient to iterate over all writtenSize-1 elements.
             long writtenBytes = 0L;
             if (writtenSize > 0) {
-                writtenBytes = (long) batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+                writtenBytes = CalculateObjectSizeUtils.getDataSize(batch.get(0)) * writtenSize;
             }
             batch.clear();
             if (!multipleSink) {
@@ -181,7 +182,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
                 st.addBatch();
                 st.executeBatch();
                 if (!multipleSink) {
-                    sinkMetricData.invoke(1, rowData.toString().getBytes().length);
+                    sinkMetricData.invokeWithEstimate(rowData);
                 } else {
                     metric[0] += 1;
                     metric[1] += rowData.toString().getBytes().length;
@@ -200,13 +201,13 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe
             if (dirtySinkHelper != null) {
                 dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, e);
             }
-            sinkMetricData.invokeDirty(1, rowData.toString().getBytes().length);
+            sinkMetricData.invokeDirtyWithEstimate(rowData);
         } else {
             if (dirtySinkHelper != null) {
                 dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e);
             }
             metric[2] += 1;
-            metric[3] += rowData.toString().getBytes().length;
+            metric[3] += CalculateObjectSizeUtils.getDataSize(rowData);
         }
     }