You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/11/21 14:39:16 UTC

(doris-flink-connector) branch master updated: [feature] refactor serializer for multitable load (#234)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 943a638  [feature] refactor serializer for multitable load (#234)
943a638 is described below

commit 943a63873f1038ad2ae226e9a0d6eba7e086d82a
Author: wudi <67...@qq.com>
AuthorDate: Tue Nov 21 22:39:10 2023 +0800

    [feature] refactor serializer for multitable load (#234)
    
    * refactor serializer
    * unified path
---
 .../org/apache/doris/flink/sink/DorisSink.java     |  3 +-
 .../doris/flink/sink/batch/DorisBatchSink.java     |  3 +-
 .../doris/flink/sink/batch/DorisBatchWriter.java   | 31 +++++++--------
 .../doris/flink/sink/writer/DorisWriter.java       | 44 ++++++----------------
 .../flink/sink/writer/SchemaChangeHelper.java      |  2 +-
 .../{ => serializer}/DorisRecordSerializer.java    |  9 +++--
 .../JsonDebeziumSchemaSerializer.java              | 10 +++--
 .../serializer/RecordWithMetaSerializer.java       | 44 ++++++++++++++++++++++
 .../writer/{ => serializer}/RowDataSerializer.java |  7 ++--
 .../writer/{ => serializer}/RowSerializer.java     |  8 ++--
 .../{ => serializer}/SimpleStringSerializer.java   |  8 ++--
 .../doris/flink/table/DorisDynamicTableSink.java   |  2 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  2 +-
 .../apache/doris/flink/CDCSchemaChangeExample.java |  2 +-
 .../flink/DorisIntranetAccessSinkExample.java      |  2 +-
 .../apache/doris/flink/DorisSinkBatchExample.java  |  2 +-
 .../org/apache/doris/flink/DorisSinkExample.java   |  2 +-
 .../doris/flink/DorisSinkExampleRowData.java       |  2 +-
 .../doris/flink/DorisSinkMultiTableExample.java    |  4 +-
 .../flink/DorisSinkStreamMultiTableExample.java    |  4 +-
 .../convert/DorisRowConverterTest.java             |  6 +--
 .../doris/flink/sink/writer/TestDorisWriter.java   |  1 +
 .../writer/TestJsonDebeziumSchemaSerializer.java   | 13 +++++--
 .../flink/sink/writer/TestRowDataSerializer.java   |  9 +++--
 .../doris/flink/sink/writer/TestRowSerializer.java |  9 +++--
 25 files changed, 135 insertions(+), 94 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index 3adcd9e..ec2b736 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -22,7 +22,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.committer.DorisCommitter;
-import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.DorisWriter;
 import org.apache.doris.flink.sink.writer.DorisWriterState;
 import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
@@ -140,6 +140,7 @@ public class DorisSink<IN>
         public DorisSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
+            Preconditions.checkNotNull(serializer);
             if(dorisReadOptions == null) {
                 dorisReadOptions = DorisReadOptions.builder().build();
             }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
index 37d3973..eea67b8 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -20,7 +20,7 @@ package org.apache.doris.flink.sink.batch;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.util.Preconditions;
@@ -87,6 +87,7 @@ public class DorisBatchSink<IN> implements Sink<IN> {
         public DorisBatchSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
+            Preconditions.checkNotNull(serializer);
             if(dorisReadOptions == null) {
                 dorisReadOptions = DorisReadOptions.builder().build();
             }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 6b2ce02..08a58cd 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -20,10 +20,11 @@ package org.apache.doris.flink.sink.batch;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -31,8 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Objects;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -92,24 +91,20 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> {
     @Override
     public void write(IN in, Context context) throws IOException, InterruptedException {
         checkFlushException();
-        if(in instanceof RecordWithMeta){
-            RecordWithMeta row = (RecordWithMeta) in;
-            if(StringUtils.isNullOrWhitespaceOnly(row.getTable())
-                    ||StringUtils.isNullOrWhitespaceOnly(row.getDatabase())
-                    ||row.getRecord() == null){
-                LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
-                return;
-            }
-            batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(), row.getRecord().getBytes(StandardCharsets.UTF_8));
+        String db = this.database;
+        String tbl = this.table;
+        Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
+        if(rowTuple == null || rowTuple.f1 == null){
+            //ddl or value is null
             return;
         }
-
-        byte[] serialize = serializer.serialize(in);
-        if(Objects.isNull(serialize)){
-            //ddl record
-            return;
+        //multi table load
+        if(rowTuple.f0 != null){
+            String[] tableInfo = rowTuple.f0.split("\\.");
+            db = tableInfo[0];
+            tbl = tableInfo[1];
         }
-        batchStreamLoad.writeRecord(database, table, serialize);
+        batchStreamLoad.writeRecord(db, tbl, rowTuple.f1);
     }
     @Override
     public void flush(boolean flush) throws IOException, InterruptedException {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index c5ce847..cb330c3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,7 +27,7 @@ import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
-import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
@@ -39,14 +39,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -152,12 +150,17 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
-        Tuple2<String, byte[]> rowTuple = serializeRecord(in);
-        String tableKey = rowTuple.f0;
-        byte[] serializeRow = rowTuple.f1;
-        if(serializeRow == null){
+        String tableKey = dorisOptions.getTableIdentifier();
+
+        Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
+        if(rowTuple == null || rowTuple.f1 == null){
+            //ddl or value is null
             return;
         }
+        //multi table load
+        if(rowTuple.f0 != null){
+            tableKey = rowTuple.f0;
+        }
 
         DorisStreamLoad streamLoader = getStreamLoader(tableKey);
         if(!loadingMap.containsKey(tableKey)) {
@@ -168,32 +171,7 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
             loadingMap.put(tableKey, true);
             globalLoading = true;
         }
-        streamLoader.writeRecord(serializeRow);
-    }
-
-    private Tuple2<String, byte[]> serializeRecord(IN in) throws IOException {
-        String tableKey = dorisOptions.getTableIdentifier();
-        byte[] serializeRow = null;
-        if(serializer != null) {
-            serializeRow = serializer.serialize(in);
-            if(Objects.isNull(serializeRow)){
-                //ddl record by JsonDebeziumSchemaSerializer
-                return Tuple2.of(tableKey, null);
-            }
-        }
-        //multi table load
-        if(in instanceof RecordWithMeta){
-            RecordWithMeta row = (RecordWithMeta) in;
-            if(StringUtils.isBlank(row.getTable())
-                    || StringUtils.isBlank(row.getDatabase())
-                    || row.getRecord() == null){
-                LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
-                return Tuple2.of(tableKey, null);
-            }
-            tableKey = row.getDatabase() + "." + row.getTable();
-            serializeRow = row.getRecord().getBytes(StandardCharsets.UTF_8);
-        }
-        return Tuple2.of(tableKey, serializeRow);
+        streamLoader.writeRecord(rowTuple.f1);
     }
 
     @Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
index 8e6307b..c1380c7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -109,7 +109,7 @@ public class SchemaChangeHelper {
         return ddlSchemas;
     }
 
-    static class DDLSchema {
+    public static class DDLSchema {
         private final String columnName;
         private final boolean isDropColumn;
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
similarity index 83%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisRecordSerializer.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
index 92b72f2..aa2b63d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisRecordSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.writer.serializer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -29,8 +31,9 @@ public interface DorisRecordSerializer<T> extends Serializable {
     /**
      * define how to convert record into byte array.
      * @param record
-     * @return byte array
+     * @return [tableIdentifer,byte array]
      * @throws IOException
      */
-    byte[] serialize(T record) throws IOException;
+    Tuple2<String, byte[]> serialize(T record) throws IOException;
+
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
similarity index 98%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index bf7b81f..2171e84 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.writer.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -33,6 +33,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.doris.flink.sink.writer.SchemaChangeHelper;
 import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
@@ -41,6 +42,7 @@ import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
 import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -132,7 +134,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
     }
 
     @Override
-    public byte[] serialize(String record) throws IOException {
+    public Tuple2<String, byte[]> serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = extractJsonNode(recordRoot, "op");
@@ -157,7 +159,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
                 addDeleteSign(valueMap, false);
                 break;
             case OP_UPDATE:
-                return extractUpdate(recordRoot);
+                return Tuple2.of(null, extractUpdate(recordRoot));
             case OP_DELETE:
                 valueMap = extractBeforeRow(recordRoot);
                 addDeleteSign(valueMap, true);
@@ -166,7 +168,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
                 LOG.error("parse record fail, unknown op {} in {}", op, record);
                 return null;
         }
-        return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
+        return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
     }
 
     /**
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
new file mode 100644
index 0000000..21608a7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class RecordWithMetaSerializer implements DorisRecordSerializer<RecordWithMeta> {
+    private static final Logger LOG = LoggerFactory.getLogger(RecordWithMetaSerializer.class);
+
+    @Override
+    public Tuple2<String, byte[]> serialize(RecordWithMeta record) throws IOException {
+        if(StringUtils.isBlank(record.getTable())
+                || StringUtils.isBlank(record.getDatabase())
+                || record.getRecord() == null){
+            LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}",
+                    record.getDatabase(), record.getTable(), record.getRecord());
+            return null;
+        }
+        String tableKey = record.getDatabase() + "." + record.getTable();
+        return Tuple2.of(tableKey, record.getRecord().getBytes(StandardCharsets.UTF_8));
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
similarity index 95%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
index 78e34aa..d29830e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
@@ -15,11 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.writer.serializer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.doris.flink.sink.EscapeHandler;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
@@ -59,7 +60,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
     }
 
     @Override
-    public byte[] serialize(RowData record) throws IOException{
+    public Tuple2<String, byte[]> serialize(RowData record) throws IOException{
         int maxIndex = Math.min(record.getArity(), fieldNames.length);
         String valString;
         if (JSON.equals(type)) {
@@ -69,7 +70,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
         } else {
             throw new IllegalArgumentException("The type " + type + " is not supported!");
         }
-        return valString.getBytes(StandardCharsets.UTF_8);
+        return Tuple2.of(null, valString.getBytes(StandardCharsets.UTF_8));
     }
 
     public String buildJsonString(RowData record, int maxIndex) throws IOException {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
similarity index 95%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
index 3a07951..3a0e610 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
@@ -15,16 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.writer.serializer;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+
 import java.io.IOException;
+
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
 
@@ -53,7 +55,7 @@ public class RowSerializer implements DorisRecordSerializer<Row> {
     }
 
     @Override
-    public byte[] serialize(Row record) throws IOException{
+    public Tuple2<String, byte[]> serialize(Row record) throws IOException{
         RowData rowDataRecord = this.rowRowConverter.toInternal(record);
         return this.rowDataSerializer.serialize(rowDataRecord);
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SimpleStringSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
similarity index 79%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SimpleStringSerializer.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
index 7e5b960..206e641 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SimpleStringSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.writer.serializer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -26,7 +28,7 @@ import java.nio.charset.StandardCharsets;
 public class SimpleStringSerializer implements DorisRecordSerializer<String> {
 
     @Override
-    public byte[] serialize(String record) throws IOException {
-        return record.getBytes(StandardCharsets.UTF_8);
+    public Tuple2<String, byte[]> serialize(String record) throws IOException {
+        return Tuple2.of(null, record.getBytes(StandardCharsets.UTF_8));
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 66d0227..a961c3c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -23,7 +23,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
-import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 99c45eb..f1579b0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -23,7 +23,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.doris.flink.table.DorisConfigOptions;
 import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
 import org.apache.flink.configuration.Configuration;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index 62578d3..2f491e2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -25,7 +25,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.doris.flink.utils.DateToStringConverter;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
index 61ad1dd..b044b50 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
@@ -21,7 +21,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.MapFunction;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index 4a04d78..0efcb4e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -20,7 +20,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
-import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
index 9c459b7..7bc8302 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
@@ -20,7 +20,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
index 79d36c5..cd4a6f5 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
@@ -20,7 +20,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.writer.LoadConstants;
-import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
index eade292..bc9861c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
@@ -21,6 +21,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
@@ -67,7 +68,8 @@ public class DorisSinkMultiTableExample {
 
         builder.setDorisReadOptions(readOptionBuilder.build())
                 .setDorisExecutionOptions(executionBuilder.build())
-                .setDorisOptions(dorisBuilder.build());
+                .setDorisOptions(dorisBuilder.build())
+                .setSerializer(new RecordWithMetaSerializer());
 
 //        RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
 //        RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
index a884ea2..4d54f4e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
@@ -21,6 +21,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -61,7 +62,8 @@ public class DorisSinkStreamMultiTableExample {
 
         builder.setDorisReadOptions(readOptionBuilder.build())
                 .setDorisExecutionOptions(executionBuilder.build())
-                .setDorisOptions(dorisBuilder.build());
+                .setDorisOptions(dorisBuilder.build())
+                .setSerializer(new RecordWithMetaSerializer());
 
 //        RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
 //        RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index e56a40e..d433384 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -17,8 +17,8 @@
 package org.apache.doris.flink.deserialization.convert;
 
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
-import org.apache.doris.flink.sink.writer.RowDataSerializer;
-import org.apache.doris.flink.sink.writer.RowDataSerializer.Builder;
+import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
+import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer.Builder;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -77,7 +77,7 @@ public class DorisRowConverterTest implements Serializable {
                 .setFieldDelimiter("|")
                 .setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"})
                 .build();
-        String s = new String(serializer.serialize(rowData));
+        String s = new String(serializer.serialize(rowData).f1);
         Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s);
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 36f98c8..a4fb64e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -23,6 +23,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 11cac1c..f1e072e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -31,6 +31,7 @@ import org.apache.doris.flink.rest.models.Field;
 import org.apache.doris.flink.rest.models.Schema;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -70,7 +71,8 @@ public class TestJsonDebeziumSchemaSerializer {
     public void testSerializeInsert() throws IOException {
         // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01 10:01:03');
         byte[] serializedValue = serializer.serialize(
-                "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\": [...]
+                "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\": [...]
+                .f1;
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
@@ -88,7 +90,8 @@ public class TestJsonDebeziumSchemaSerializer {
     public void testSerializeUpdate() throws IOException {
         // update t1 set name='doris-update' WHERE id =1;
         byte[] serializedValue = serializer.serialize(
-                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...]
+                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...]
+                .f1;
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
@@ -108,7 +111,8 @@ public class TestJsonDebeziumSchemaSerializer {
                 .build();
         // update t1 set name='doris-update' WHERE id =1;
         byte[] serializedValue = serializer.serialize(
-                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...]
+                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...]
+                .f1;
         String row = new String(serializedValue, StandardCharsets.UTF_8);
         String[] split = row.split("\n");
         Map<String, String> valueMap = objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() {
@@ -129,7 +133,8 @@ public class TestJsonDebeziumSchemaSerializer {
     @Test
     public void testSerializeDelete() throws IOException {
         byte[] serializedValue = serializer.serialize(
-                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t [...]
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t [...]
+                .f1;
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
index e8d5ccb..6769a72 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.writer;
 
+import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -57,7 +58,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData);
+        byte[] serializedValue = serializer.serialize(rowData).f1;
         Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue);
     }
 
@@ -66,7 +67,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData);
+        byte[] serializedValue = serializer.serialize(rowData).f1;
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
@@ -79,7 +80,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData);
+        byte[] serializedValue = serializer.serialize(rowData).f1;
         Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue);
     }
 
@@ -88,7 +89,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData);
+        byte[] serializedValue = serializer.serialize(rowData).f1;
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
index 6c07289..4dfe265 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.sink.writer.serializer.RowSerializer;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -55,7 +56,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row);
+        byte[] serializedValue = serializer.serialize(row).f1;
         Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue);
     }
 
@@ -64,7 +65,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row);
+        byte[] serializedValue = serializer.serialize(row).f1;
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
@@ -77,7 +78,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row);
+        byte[] serializedValue = serializer.serialize(row).f1;
         Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue);
     }
 
@@ -86,7 +87,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row);
+        byte[] serializedValue = serializer.serialize(row).f1;
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org