You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TyrantLucifer (via GitHub)" <gi...@apache.org> on 2023/05/12 02:17:31 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1191840276


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java:
##########
@@ -20,70 +20,100 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSink.class)
 public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType rowType;
+    private MongodbWriterOptions options;
 
-    private MongodbConfig params;
+    private SeaTunnelRowType seaTunnelRowType;
 
     @Override
-    public String getPluginName() {
-        return "MongoDB";
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            MongodbWriterOptions.Builder builder =
+                    MongodbWriterOptions.builder()
+                            .withConnectString(connection)
+                            .withDatabase(database)
+                            .withCollection(collection)
+                            .withFlushSize(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())

Review Comment:
   use if-else instead



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java:
##########
@@ -20,70 +20,100 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSink.class)
 public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType rowType;
+    private MongodbWriterOptions options;
 
-    private MongodbConfig params;
+    private SeaTunnelRowType seaTunnelRowType;
 
     @Override
-    public String getPluginName() {
-        return "MongoDB";
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            MongodbWriterOptions.Builder builder =
+                    MongodbWriterOptions.builder()
+                            .withConnectString(connection)
+                            .withDatabase(database)
+                            .withCollection(collection)
+                            .withFlushSize(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())
+                                            ? pluginConfig.getInt(
+                                                    MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())
+                                            : MongodbConfig.BUFFER_FLUSH_MAX_ROWS.defaultValue())
+                            .withBatchIntervalMs(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())
+                                            ? pluginConfig.getLong(
+                                                    MongodbConfig.BUFFER_FLUSH_INTERVAL.key())
+                                            : MongodbConfig.BUFFER_FLUSH_INTERVAL.defaultValue())
+                            .withUpsertKey(
+                                    pluginConfig.hasPath(MongodbConfig.UPSERT_KEY.key())
+                                            ? pluginConfig
+                                                    .getStringList(MongodbConfig.UPSERT_KEY.key())
+                                                    .toArray(new String[0])
+                                            : new String[] {})
+                            .withUpsertEnable(
+                                    pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())
+                                            ? pluginConfig.getBoolean(
+                                                    MongodbConfig.UPSERT_ENABLE.key())
+                                            : MongodbConfig.UPSERT_ENABLE.defaultValue())
+                            .withRetryMax(
+                                    pluginConfig.hasPath(MongodbConfig.RETRY_MAX.key())
+                                            ? pluginConfig.getInt(MongodbConfig.RETRY_MAX.key())
+                                            : MongodbConfig.RETRY_MAX.defaultValue())
+                            .withRetryInterval(
+                                    pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())
+                                            ? pluginConfig.getLong(
+                                                    MongodbConfig.RETRY_INTERVAL.key())
+                                            : MongodbConfig.RETRY_INTERVAL.defaultValue());
+            this.options = builder.build();
+        }
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, URI.key(), DATABASE.key(), COLLECTION.key());
-        if (!result.isSuccess()) {
-            throw new MongodbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, result.getMsg()));
-        }
-
-        this.params = MongodbConfig.buildWithConfig(config);
+    public String getPluginName() {
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType rowType) {
-        this.rowType = rowType;
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
     }
 
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return rowType;
+        return seaTunnelRowType;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
             throws IOException {
-        boolean useSimpleTextSchema = CatalogTableUtil.buildSimpleTextSchema().equals(rowType);
-        return new MongodbSinkWriter(rowType, useSimpleTextSchema, params);
+        return new MongodbWriter(
+                new RowDataDocumentSerializer(

Review Comment:
   add context into writer, it will be used to dirty records collect



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java:
##########
@@ -20,57 +20,125 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSplitStrategy;
+
+import org.bson.BsonDocument;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import java.util.ArrayList;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSource.class)
-public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class MongodbSource
+        implements SeaTunnelSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>,
+                SupportColumnProjection {
+
+    private static final long serialVersionUID = 1L;
+
+    private MongodbClientProvider clientProvider;
+
+    private DocumentDeserializer<SeaTunnelRow> deserializer;
+
+    private MongoSplitStrategy splitStrategy;
 
     private SeaTunnelRowType rowType;
 
-    private MongodbConfig params;
+    private MongodbReadOptions mongodbReadOptions;
 
     @Override
     public String getPluginName() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, URI.key(), DATABASE.key(), COLLECTION.key());
-        if (!result.isSuccess()) {
-            throw new MongodbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, result.getMsg()));
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            clientProvider =
+                    MongodbCollectionProvider.getBuilder()
+                            .connectionString(connection)
+                            .database(database)
+                            .collection(collection)
+                            .build();
         }
-        this.params = MongodbConfig.buildWithConfig(config);
-        if (config.hasPath(CatalogTableUtil.SCHEMA.key())) {
-            this.rowType = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+            this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
         } else {
             this.rowType = CatalogTableUtil.buildSimpleTextSchema();
         }
+        deserializer =
+                new DocumentRowDataDeserializer(
+                        rowType.getFieldNames(),
+                        rowType,
+                        pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key())
+                                ? pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key())

Review Comment:
   The same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/matchIT/mongodb_matchQuery_source_to_assert.conf:
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.SerializableFunction;
+
+import org.bson.BsonDocument;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class MongoKeyExtractor implements SerializableFunction<BsonDocument, BsonDocument> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static String[] upsertKey;

Review Comment:
   why static



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/matchIT/mongodb_matchProjection_source_to_assert.conf:
##########
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.json.JsonParseException;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.seatunnel.api.table.type.SqlType.NULL;
+import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.ENCODE_VALUE_FIELD;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters.fromBigDecimal;
+
+public class RowDataToBsonConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FunctionalInterface
+    public interface RowDataToBsonConverter extends Serializable {
+        BsonDocument convert(SeaTunnelRow rowData);
+    }
+
+    public static RowDataToBsonConverter createConverter(SeaTunnelDataType<?> type) {
+        SerializableFunction<Object, BsonValue> internalRowConverter =
+                createNullSafeInternalConverter(type);
+        return new RowDataToBsonConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public BsonDocument convert(SeaTunnelRow rowData) {
+                return (BsonDocument) internalRowConverter.apply(rowData);
+            }
+        };
+    }
+
+    private static SerializableFunction<Object, BsonValue> createNullSafeInternalConverter(
+            SeaTunnelDataType<?> type) {
+        return wrapIntoNullSafeInternalConverter(createInternalConverter(type), type);
+    }
+
+    private static SerializableFunction<Object, BsonValue> wrapIntoNullSafeInternalConverter(
+            SerializableFunction<Object, BsonValue> internalConverter, SeaTunnelDataType<?> type) {
+        return new SerializableFunction<Object, BsonValue>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public BsonValue apply(Object value) {
+                if (value == null || NULL.equals(type.getSqlType())) {
+                    throw new MongodbConnectorException(
+                            UNSUPPORTED_DATA_TYPE,
+                            "The column type is <"
+                                    + type
+                                    + ">, but a null value is being written into it");
+                } else {
+                    return internalConverter.apply(value);
+                }
+            }
+        };
+    }
+
+    private static SerializableFunction<Object, BsonValue> createInternalConverter(
+            SeaTunnelDataType<?> type) {
+        switch (type.getSqlType()) {
+            case NULL:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return BsonNull.VALUE;
+                    }
+                };
+            case BOOLEAN:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonBoolean((boolean) value);
+                    }
+                };
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        int intValue =
+                                value instanceof Byte
+                                        ? ((Byte) value) & 0xFF
+                                        : value instanceof Short
+                                                ? ((Short) value).intValue()
+                                                : (int) value;
+                        return new BsonInt32(intValue);
+                    }
+                };
+            case BIGINT:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonInt64((long) value);
+                    }
+                };
+            case FLOAT:
+            case DOUBLE:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        double v =
+                                value instanceof Float
+                                        ? ((Float) value).doubleValue()
+                                        : (double) value;
+                        return new BsonDouble(v);
+                    }
+                };
+            case STRING:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        String val = value.toString();
+                        // try to parse out the mongodb specific data type from extend-json.
+                        if (val.startsWith("{")
+                                && val.endsWith("}")
+                                && val.contains(ENCODE_VALUE_FIELD)) {
+                            try {
+                                BsonDocument doc = BsonDocument.parse(val);
+                                if (doc.containsKey(ENCODE_VALUE_FIELD)) {
+                                    return doc.get(ENCODE_VALUE_FIELD);
+                                }
+                            } catch (JsonParseException e) {
+                                // invalid json format, fallback to store as a bson string.
+                                return new BsonString(value.toString());
+                            }
+                        }
+                        return new BsonString(value.toString());
+                    }
+                };
+            case BYTES:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonBinary((byte[]) value);
+                    }
+                };
+            case DATE:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        LocalDate localDate = (LocalDate) value;
+                        return new BsonDateTime(
+                                localDate
+                                        .atStartOfDay(ZoneId.systemDefault())
+                                        .toInstant()
+                                        .toEpochMilli());
+                    }
+                };
+            case TIMESTAMP:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        LocalDateTime localDateTime = (LocalDateTime) value;
+                        return new BsonDateTime(
+                                localDateTime
+                                        .atZone(ZoneId.systemDefault())
+                                        .toInstant()
+                                        .toEpochMilli());
+                    }
+                };
+            case DECIMAL:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        DecimalType decimalType = (DecimalType) type;
+                        BigDecimal decimalVal = (BigDecimal) value;
+                        return new BsonDecimal128(
+                                new Decimal128(
+                                        Objects.requireNonNull(
+                                                fromBigDecimal(
+                                                        decimalVal,
+                                                        decimalType.getPrecision(),
+                                                        decimalType.getScale()))));
+                    }
+                };
+            case ARRAY:
+                return createArrayConverter((ArrayType<?, ?>) type);
+            case MAP:
+                MapType<?, ?> mapType = (MapType<?, ?>) type;
+                return createMapConverter(
+                        mapType.toString(), mapType.getKeyType(), mapType.getValueType());
+            case ROW:
+                return createRowConverter((SeaTunnelRowType) type);
+            default:
+                throw new UnsupportedOperationException("Not support to parse type: " + type);

Review Comment:
   MongoDBConnectorException



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/update_mongodb_to_assert.conf:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/flatIT/fake_source_to_flat_mongodb.conf:
##########
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org