You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/08 15:59:33 UTC

[GitHub] [flink-connector-mongodb] twalthr commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

twalthr commented on code in PR #1:
URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1043369287


##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.MongoSource;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
+import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataDeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A {@link DynamicTableSource} for MongoDB. */
+@Internal
+public class MongoDynamicTableSource
+        implements ScanTableSource,
+                LookupTableSource,
+                SupportsProjectionPushDown,
+                SupportsLimitPushDown {
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoReadOptions readOptions;
+    @Nullable private final LookupCache lookupCache;
+    private final int lookupMaxRetries;
+    private final long lookupRetryIntervalMs;
+    private DataType physicalRowDataType;
+    private int limit = -1;
+
+    public MongoDynamicTableSource(
+            MongoConnectionOptions connectionOptions,
+            MongoReadOptions readOptions,
+            @Nullable LookupCache lookupCache,
+            int lookupMaxRetries,
+            long lookupRetryIntervalMs,
+            DataType physicalRowDataType) {
+        this.connectionOptions = connectionOptions;
+        this.readOptions = readOptions;
+        this.lookupCache = lookupCache;
+        checkArgument(
+                lookupMaxRetries >= 0,
+                String.format(
+                        "The '%s' must be larger than or equal to 0.",
+                        LookupOptions.MAX_RETRIES.key()));
+        checkArgument(
+                lookupRetryIntervalMs > 0,
+                String.format("The '%s' must be larger than 0.", LOOKUP_RETRY_INTERVAL.key()));
+        this.lookupMaxRetries = lookupMaxRetries;
+        this.lookupRetryIntervalMs = lookupRetryIntervalMs;
+        this.physicalRowDataType = physicalRowDataType;
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        final List<String> keyNames = new ArrayList<>(context.getKeys().length);
+        for (int[] innerKeyArr : context.getKeys()) {
+            Preconditions.checkArgument(
+                    innerKeyArr.length == 1, "MongoDB only support non-nested look up keys yet");
+            keyNames.add(DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]));
+        }
+        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+        MongoRowDataLookupFunction lookupFunction =
+                new MongoRowDataLookupFunction(
+                        connectionOptions,
+                        lookupMaxRetries,
+                        lookupRetryIntervalMs,
+                        DataType.getFieldNames(physicalRowDataType),
+                        DataType.getFieldDataTypes(physicalRowDataType),
+                        keyNames,
+                        rowType);
+        if (lookupCache != null) {
+            return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
+        } else {
+            return LookupFunctionProvider.of(lookupFunction);
+        }
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+        final TypeInformation<RowData> typeInfo =
+                runtimeProviderContext.createTypeInformation(physicalRowDataType);
+
+        final MongoDeserializationSchema<RowData> deserializationSchema =
+                new MongoRowDataDeserializationSchema(rowType, typeInfo);
+
+        MongoSource<RowData> mongoSource =
+                MongoSource.<RowData>builder()
+                        .setUri(connectionOptions.getUri())
+                        .setDatabase(connectionOptions.getDatabase())
+                        .setCollection(connectionOptions.getCollection())
+                        .setFetchSize(readOptions.getFetchSize())
+                        .setCursorBatchSize(readOptions.getCursorBatchSize())
+                        .setNoCursorTimeout(readOptions.isNoCursorTimeout())
+                        .setPartitionStrategy(readOptions.getPartitionStrategy())
+                        .setPartitionSize(readOptions.getPartitionSize())
+                        .setSamplesPerPartition(readOptions.getSamplesPerPartition())
+                        .setLimit(limit)
+                        .setProjectedFields(DataType.getFieldNames(physicalRowDataType))
+                        .setDeserializationSchema(deserializationSchema)
+                        .build();
+
+        return SourceProvider.of(mongoSource);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new MongoDynamicTableSource(
+                connectionOptions,
+                readOptions,
+                lookupCache,
+                lookupMaxRetries,
+                lookupRetryIntervalMs,
+                physicalRowDataType);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "MongoDB";
+    }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.limit = (int) limit;
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        // planner doesn't support nested projection push down yet.
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);

Review Comment:
   use `producedDataType` directly? 



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##########
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.mongodb.internal.HexUtils;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonDocument;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface BsonToRowDataConverter extends Serializable {
+        Object convert(BsonValue bsonValue);
+    }
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for
+    // sql-connector uber jars.
+    // --------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    public static BsonToRowDataConverter createNullableConverter(LogicalType type) {
+        return wrapIntoNullableInternalConverter(createConverter(type));
+    }
+
+    private static BsonToRowDataConverter wrapIntoNullableInternalConverter(
+            BsonToRowDataConverter bsonToRowDataConverter) {
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (bsonValue == null || bsonValue.isNull() || bsonValue instanceof BsonUndefined) {
+                    return null;
+                }
+                if (bsonValue.isDecimal128() && bsonValue.asDecimal128().getValue().isNaN()) {
+                    return null;
+                }
+                return bsonToRowDataConverter.convert(bsonValue);
+            }
+        };
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private static BsonToRowDataConverter createConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBoolean(bsonValue);
+                    }
+                };
+            case TINYINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToTinyInt(bsonValue);
+                    }
+                };
+            case SMALLINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToSmallInt(bsonValue);
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToInt(bsonValue);
+                    }
+                };
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToLong(bsonValue);
+                    }
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue));
+                    }
+                };
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromInstant(convertToInstant(bsonValue));
+                    }
+                };
+            case FLOAT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToFloat(bsonValue);
+                    }
+                };
+            case DOUBLE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToDouble(bsonValue);
+                    }
+                };
+            case CHAR:
+            case VARCHAR:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return StringData.fromString(convertToString(bsonValue));
+                    }
+                };
+            case BINARY:
+            case VARBINARY:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBinary(bsonValue);
+                    }
+                };
+            case DECIMAL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        DecimalType decimalType = (DecimalType) type;
+                        BigDecimal decimalValue = convertToBigDecimal(bsonValue);
+                        return DecimalData.fromBigDecimal(
+                                decimalValue, decimalType.getPrecision(), decimalType.getScale());
+                    }
+                };
+            case ROW:
+                return createRowConverter((RowType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                return createMapConverter((MapType) type);
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static BsonToRowDataConverter createRowConverter(RowType rowType) {
+        final BsonToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(BsonToRowDataConverters::createNullableConverter)
+                        .toArray(BsonToRowDataConverter[]::new);
+        final int arity = rowType.getFieldCount();
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isDocument()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to rowType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                BsonDocument document = bsonValue.asDocument();
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    BsonValue fieldValue = document.get(fieldName);
+                    Object convertedField = fieldConverters[i].convert(fieldValue);
+                    row.setField(i, convertedField);
+                }
+                return row;
+            }
+        };
+    }
+
+    private static BsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        final BsonToRowDataConverter elementConverter =
+                createNullableConverter(arrayType.getElementType());

Review Comment:
   Creating a nullable converter regardless of the nullability of the logical type can be dangerous. It means that potentially nulls are travelling through the topology even though the schema says they don't. Either we do not allow `NOT NULL` constraints and check this in the `DynamicTableFactory` or we stick to the semantics and produce an error if an unexpected null occurs.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##########
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.mongodb.internal.HexUtils;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonDocument;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface BsonToRowDataConverter extends Serializable {
+        Object convert(BsonValue bsonValue);
+    }
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for
+    // sql-connector uber jars.
+    // --------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    public static BsonToRowDataConverter createNullableConverter(LogicalType type) {
+        return wrapIntoNullableInternalConverter(createConverter(type));
+    }
+
+    private static BsonToRowDataConverter wrapIntoNullableInternalConverter(
+            BsonToRowDataConverter bsonToRowDataConverter) {
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (bsonValue == null || bsonValue.isNull() || bsonValue instanceof BsonUndefined) {
+                    return null;
+                }
+                if (bsonValue.isDecimal128() && bsonValue.asDecimal128().getValue().isNaN()) {
+                    return null;
+                }
+                return bsonToRowDataConverter.convert(bsonValue);
+            }
+        };
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private static BsonToRowDataConverter createConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBoolean(bsonValue);
+                    }
+                };
+            case TINYINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToTinyInt(bsonValue);
+                    }
+                };
+            case SMALLINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToSmallInt(bsonValue);
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToInt(bsonValue);
+                    }
+                };
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToLong(bsonValue);
+                    }
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue));
+                    }
+                };
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromInstant(convertToInstant(bsonValue));
+                    }
+                };
+            case FLOAT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToFloat(bsonValue);
+                    }
+                };
+            case DOUBLE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToDouble(bsonValue);
+                    }
+                };
+            case CHAR:
+            case VARCHAR:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return StringData.fromString(convertToString(bsonValue));
+                    }
+                };
+            case BINARY:
+            case VARBINARY:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBinary(bsonValue);
+                    }
+                };
+            case DECIMAL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        DecimalType decimalType = (DecimalType) type;
+                        BigDecimal decimalValue = convertToBigDecimal(bsonValue);
+                        return DecimalData.fromBigDecimal(
+                                decimalValue, decimalType.getPrecision(), decimalType.getScale());
+                    }
+                };
+            case ROW:
+                return createRowConverter((RowType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                return createMapConverter((MapType) type);
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static BsonToRowDataConverter createRowConverter(RowType rowType) {
+        final BsonToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(BsonToRowDataConverters::createNullableConverter)
+                        .toArray(BsonToRowDataConverter[]::new);
+        final int arity = rowType.getFieldCount();
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isDocument()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to rowType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                BsonDocument document = bsonValue.asDocument();
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    BsonValue fieldValue = document.get(fieldName);
+                    Object convertedField = fieldConverters[i].convert(fieldValue);
+                    row.setField(i, convertedField);
+                }
+                return row;
+            }
+        };
+    }
+
+    private static BsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        final BsonToRowDataConverter elementConverter =
+                createNullableConverter(arrayType.getElementType());
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isArray()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to arrayType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                List<BsonValue> in = bsonValue.asArray();
+                final Object[] elementArray = new Object[in.size()];
+                for (int i = 0; i < in.size(); i++) {
+                    elementArray[i] = elementConverter.convert(in.get(i));
+                }
+                return new GenericArrayData(elementArray);
+            }
+        };
+    }
+
+    private static BsonToRowDataConverter createMapConverter(MapType mapType) {
+        LogicalType keyType = mapType.getKeyType();
+        checkArgument(keyType.supportsInputConversion(String.class));
+
+        LogicalType valueType = mapType.getValueType();
+        BsonToRowDataConverter valueConverter = createNullableConverter(valueType);
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isDocument()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to rowType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                BsonDocument document = bsonValue.asDocument();
+                Map<StringData, Object> map = new HashMap<>();
+                for (String key : document.keySet()) {
+                    map.put(StringData.fromString(key), valueConverter.convert(document.get(key)));
+                }
+                return new GenericMapData(map);
+            }
+        };
+    }
+
+    private static boolean convertToBoolean(BsonValue bsonValue) {
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue();
+        }
+        if (bsonValue.isInt32()) {

Review Comment:
   this logic here seems to be out of scope. BSON defines a boolean type and that's all we should support in this location. parsing ints or strings is the responsibility of the SQL engine and can be done by provided casting functions 



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataSerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link DynamicTableSink} for MongoDB. */
+@Internal
+public class MongoDynamicTableSink implements DynamicTableSink {
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoWriteOptions writeOptions;
+    @Nullable private final Integer parallelism;
+    private final DataType physicalRowDataType;
+    private final SerializableFunction<RowData, BsonValue> keyExtractor;
+
+    public MongoDynamicTableSink(
+            MongoConnectionOptions connectionOptions,
+            MongoWriteOptions writeOptions,
+            @Nullable Integer parallelism,
+            DataType physicalRowDataType,
+            SerializableFunction<RowData, BsonValue> keyExtractor) {
+        this.connectionOptions = checkNotNull(connectionOptions);
+        this.writeOptions = checkNotNull(writeOptions);
+        this.parallelism = parallelism;
+        this.physicalRowDataType = checkNotNull(physicalRowDataType);
+        this.keyExtractor = checkNotNull(keyExtractor);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return ChangelogMode.upsert();

Review Comment:
   Shouldn't the changelog mode depend on the primary key? If we are not in sync with `AppendOnlyKeyExtractor` an update-before and update-after might be partitioned as separate entities. When rereading they would appear as two independent records and thus source and sink do not consume/produce the same data.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.MongoSource;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
+import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataDeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A {@link DynamicTableSource} for MongoDB. */
+@Internal
+public class MongoDynamicTableSource
+        implements ScanTableSource,
+                LookupTableSource,
+                SupportsProjectionPushDown,
+                SupportsLimitPushDown {
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoReadOptions readOptions;
+    @Nullable private final LookupCache lookupCache;
+    private final int lookupMaxRetries;
+    private final long lookupRetryIntervalMs;
+    private DataType physicalRowDataType;
+    private int limit = -1;
+
+    public MongoDynamicTableSource(
+            MongoConnectionOptions connectionOptions,
+            MongoReadOptions readOptions,
+            @Nullable LookupCache lookupCache,
+            int lookupMaxRetries,
+            long lookupRetryIntervalMs,
+            DataType physicalRowDataType) {
+        this.connectionOptions = connectionOptions;
+        this.readOptions = readOptions;
+        this.lookupCache = lookupCache;
+        checkArgument(
+                lookupMaxRetries >= 0,
+                String.format(
+                        "The '%s' must be larger than or equal to 0.",
+                        LookupOptions.MAX_RETRIES.key()));
+        checkArgument(
+                lookupRetryIntervalMs > 0,
+                String.format("The '%s' must be larger than 0.", LOOKUP_RETRY_INTERVAL.key()));
+        this.lookupMaxRetries = lookupMaxRetries;
+        this.lookupRetryIntervalMs = lookupRetryIntervalMs;
+        this.physicalRowDataType = physicalRowDataType;
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        final List<String> keyNames = new ArrayList<>(context.getKeys().length);
+        for (int[] innerKeyArr : context.getKeys()) {
+            Preconditions.checkArgument(
+                    innerKeyArr.length == 1, "MongoDB only support non-nested look up keys yet");
+            keyNames.add(DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]));
+        }
+        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+        MongoRowDataLookupFunction lookupFunction =
+                new MongoRowDataLookupFunction(
+                        connectionOptions,
+                        lookupMaxRetries,
+                        lookupRetryIntervalMs,
+                        DataType.getFieldNames(physicalRowDataType),
+                        DataType.getFieldDataTypes(physicalRowDataType),
+                        keyNames,
+                        rowType);
+        if (lookupCache != null) {
+            return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
+        } else {
+            return LookupFunctionProvider.of(lookupFunction);
+        }
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+        final TypeInformation<RowData> typeInfo =
+                runtimeProviderContext.createTypeInformation(physicalRowDataType);
+
+        final MongoDeserializationSchema<RowData> deserializationSchema =
+                new MongoRowDataDeserializationSchema(rowType, typeInfo);
+
+        MongoSource<RowData> mongoSource =
+                MongoSource.<RowData>builder()
+                        .setUri(connectionOptions.getUri())
+                        .setDatabase(connectionOptions.getDatabase())
+                        .setCollection(connectionOptions.getCollection())
+                        .setFetchSize(readOptions.getFetchSize())
+                        .setCursorBatchSize(readOptions.getCursorBatchSize())
+                        .setNoCursorTimeout(readOptions.isNoCursorTimeout())
+                        .setPartitionStrategy(readOptions.getPartitionStrategy())
+                        .setPartitionSize(readOptions.getPartitionSize())
+                        .setSamplesPerPartition(readOptions.getSamplesPerPartition())
+                        .setLimit(limit)
+                        .setProjectedFields(DataType.getFieldNames(physicalRowDataType))
+                        .setDeserializationSchema(deserializationSchema)
+                        .build();
+
+        return SourceProvider.of(mongoSource);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new MongoDynamicTableSource(
+                connectionOptions,
+                readOptions,
+                lookupCache,
+                lookupMaxRetries,
+                lookupRetryIntervalMs,
+                physicalRowDataType);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "MongoDB";
+    }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.limit = (int) limit;
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        // planner doesn't support nested projection push down yet.
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);

Review Comment:
   btw we try to have a naming convention for field names: `physicalRowDataType=table schema`, `producedDataType=fields effectively coming out of the table source`. let's rename the member variable in this class to `producedDataType`



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java:
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import com.mongodb.internal.HexUtils;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonDocument;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Tool class used to convert from {@link BsonValue} to {@link RowData}. * */
+@Internal
+public class BsonToRowDataConverters {
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts {@link BsonValue} into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface BsonToRowDataConverter extends Serializable {
+        Object convert(BsonValue bsonValue);
+    }
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Bson for
+    // sql-connector uber jars.
+    // --------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    public static BsonToRowDataConverter createNullableConverter(LogicalType type) {
+        return wrapIntoNullableInternalConverter(createConverter(type));
+    }
+
+    private static BsonToRowDataConverter wrapIntoNullableInternalConverter(
+            BsonToRowDataConverter bsonToRowDataConverter) {
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (bsonValue == null || bsonValue.isNull() || bsonValue instanceof BsonUndefined) {
+                    return null;
+                }
+                if (bsonValue.isDecimal128() && bsonValue.asDecimal128().getValue().isNaN()) {
+                    return null;
+                }
+                return bsonToRowDataConverter.convert(bsonValue);
+            }
+        };
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private static BsonToRowDataConverter createConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBoolean(bsonValue);
+                    }
+                };
+            case TINYINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToTinyInt(bsonValue);
+                    }
+                };
+            case SMALLINT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToSmallInt(bsonValue);
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToInt(bsonValue);
+                    }
+                };
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToLong(bsonValue);
+                    }
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue));
+                    }
+                };
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return TimestampData.fromInstant(convertToInstant(bsonValue));
+                    }
+                };
+            case FLOAT:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToFloat(bsonValue);
+                    }
+                };
+            case DOUBLE:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToDouble(bsonValue);
+                    }
+                };
+            case CHAR:
+            case VARCHAR:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return StringData.fromString(convertToString(bsonValue));
+                    }
+                };
+            case BINARY:
+            case VARBINARY:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        return convertToBinary(bsonValue);
+                    }
+                };
+            case DECIMAL:
+                return new BsonToRowDataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(BsonValue bsonValue) {
+                        DecimalType decimalType = (DecimalType) type;
+                        BigDecimal decimalValue = convertToBigDecimal(bsonValue);
+                        return DecimalData.fromBigDecimal(
+                                decimalValue, decimalType.getPrecision(), decimalType.getScale());
+                    }
+                };
+            case ROW:
+                return createRowConverter((RowType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                return createMapConverter((MapType) type);
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static BsonToRowDataConverter createRowConverter(RowType rowType) {
+        final BsonToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(BsonToRowDataConverters::createNullableConverter)
+                        .toArray(BsonToRowDataConverter[]::new);
+        final int arity = rowType.getFieldCount();
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isDocument()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to rowType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                BsonDocument document = bsonValue.asDocument();
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    BsonValue fieldValue = document.get(fieldName);
+                    Object convertedField = fieldConverters[i].convert(fieldValue);
+                    row.setField(i, convertedField);
+                }
+                return row;
+            }
+        };
+    }
+
+    private static BsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+        final BsonToRowDataConverter elementConverter =
+                createNullableConverter(arrayType.getElementType());
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isArray()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to arrayType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                List<BsonValue> in = bsonValue.asArray();
+                final Object[] elementArray = (Object[]) Array.newInstance(elementClass, in.size());
+                for (int i = 0; i < in.size(); i++) {
+                    elementArray[i] = elementConverter.convert(in.get(i));
+                }
+                return new GenericArrayData(elementArray);
+            }
+        };
+    }
+
+    private static BsonToRowDataConverter createMapConverter(MapType mapType) {
+        LogicalType keyType = mapType.getKeyType();
+        checkArgument(keyType.supportsInputConversion(String.class));
+
+        LogicalType valueType = mapType.getValueType();
+        BsonToRowDataConverter valueConverter = createNullableConverter(valueType);
+
+        return new BsonToRowDataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(BsonValue bsonValue) {
+                if (!bsonValue.isDocument()) {
+                    throw new IllegalArgumentException(
+                            "Unable to convert to rowType from unexpected value '"
+                                    + bsonValue
+                                    + "' of type "
+                                    + bsonValue.getBsonType());
+                }
+
+                BsonDocument document = bsonValue.asDocument();
+                Map<StringData, Object> map = new HashMap<>();
+                for (String key : document.keySet()) {
+                    map.put(StringData.fromString(key), valueConverter.convert(document.get(key)));
+                }
+                return new GenericMapData(map);
+            }
+        };
+    }
+
+    private static boolean convertToBoolean(BsonValue bsonValue) {
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue();
+        }
+        if (bsonValue.isInt32()) {
+            return bsonValue.asInt32().getValue() == 1;
+        }
+        if (bsonValue.isInt64()) {
+            return bsonValue.asInt64().getValue() == 1L;
+        }
+        if (bsonValue.isString()) {
+            return Boolean.parseBoolean(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to boolean from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static byte convertToTinyInt(BsonValue bsonValue) {
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue() ? (byte) 1 : (byte) 0;
+        }
+        if (bsonValue.isInt32()) {
+            return (byte) bsonValue.asInt32().getValue();
+        }
+        if (bsonValue.isInt64()) {
+            return (byte) bsonValue.asInt64().getValue();
+        }
+        if (bsonValue.isString()) {
+            return Byte.parseByte(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to tinyint from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static short convertToSmallInt(BsonValue bsonValue) {
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue() ? (short) 1 : (short) 0;
+        }
+        if (bsonValue.isInt32()) {
+            return (short) bsonValue.asInt32().getValue();
+        }
+        if (bsonValue.isInt64()) {
+            return (short) bsonValue.asInt64().getValue();
+        }
+        if (bsonValue.isString()) {
+            return Short.parseShort(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to smallint from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static int convertToInt(BsonValue bsonValue) {
+        if (bsonValue.isNumber()) {
+            return bsonValue.asNumber().intValue();
+        }
+        if (bsonValue.isDecimal128()) {
+            Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value();
+            if (decimal128Value.isFinite()) {
+                return decimal128Value.intValue();
+            } else if (decimal128Value.isNegative()) {
+                return Integer.MIN_VALUE;
+            } else {
+                return Integer.MAX_VALUE;
+            }
+        }
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue() ? 1 : 0;
+        }
+        if (bsonValue.isDateTime()) {
+            return (int) Instant.ofEpochMilli(bsonValue.asDateTime().getValue()).getEpochSecond();
+        }
+        if (bsonValue.isTimestamp()) {
+            return bsonValue.asTimestamp().getTime();
+        }
+        if (bsonValue.isString()) {
+            return Integer.parseInt(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to integer from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static long convertToLong(BsonValue bsonValue) {
+        if (bsonValue.isNumber()) {
+            return bsonValue.asNumber().longValue();
+        }
+        if (bsonValue.isDecimal128()) {
+            Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value();
+            if (decimal128Value.isFinite()) {
+                return decimal128Value.longValue();
+            } else if (decimal128Value.isNegative()) {
+                return Long.MIN_VALUE;
+            } else {
+                return Long.MAX_VALUE;
+            }
+        }
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue() ? 1L : 0L;
+        }
+        if (bsonValue.isDateTime()) {
+            return bsonValue.asDateTime().getValue();
+        }
+        if (bsonValue.isTimestamp()) {
+            return bsonValue.asTimestamp().getTime() * 1000L;
+        }
+        if (bsonValue.isString()) {
+            return Long.parseLong(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to long from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static double convertToDouble(BsonValue bsonValue) {
+        if (bsonValue.isNumber()) {
+            return bsonValue.asNumber().doubleValue();
+        }
+        if (bsonValue.isDecimal128()) {
+            Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value();
+            if (decimal128Value.isFinite()) {
+                return decimal128Value.doubleValue();
+            } else if (decimal128Value.isNegative()) {
+                return -Double.MAX_VALUE;
+            } else {
+                return Double.MAX_VALUE;
+            }
+        }
+        if (bsonValue.isBoolean()) {
+            return bsonValue.asBoolean().getValue() ? 1 : 0;
+        }
+        if (bsonValue.isString()) {
+            return Double.parseDouble(bsonValue.asString().getValue());
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to double from unexpected value '"
+                        + bsonValue
+                        + "' of type "
+                        + bsonValue.getBsonType());
+    }
+
+    private static float convertToFloat(BsonValue bsonValue) {
+        if (bsonValue.isInt32()) {
+            return bsonValue.asInt32().getValue();
+        }
+        if (bsonValue.isInt64()) {

Review Comment:
   I share @zentol's opinion. I actually posted the same comment above. We should avoid conversion/casting logic in connectors but try to map types 1:1 where possible. I know that connectors have many types that Flink does not understand yet (or there is no way to express them), this is high on the agenda.
   
   Since MongoDB uses a string-based format, the easiest solution is that a user declares a field as STRING for data types that Flink does not support. This should make it possible to read values and post-process them with build-in JSON functions or UDFs. Is there a way in the BSON APIs to get the raw string-based out of it?



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
+import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
+import org.bson.types.Decimal128;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
+import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link MongoDynamicTableSource}. */
+@Testcontainers
+public class MongoDynamicTableSourceITCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Container
+    private static final MongoDBContainer MONGO_CONTAINER =
+            MongoTestUtil.createMongoDBContainer(LOG);
+
+    public static final String TEST_DATABASE = "test";
+    public static final String TEST_COLLECTION = "mongo_table_source";
+
+    private static MongoClient mongoClient;
+
+    public static StreamExecutionEnvironment env;
+    public static StreamTableEnvironment tEnv;
+
+    @BeforeAll
+    static void beforeAll() {
+        mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
+
+        MongoCollection<BsonDocument> coll =
+                mongoClient
+                        .getDatabase(TEST_DATABASE)
+                        .getCollection(TEST_COLLECTION)
+                        .withDocumentClass(BsonDocument.class);
+
+        List<BsonDocument> testRecords = Arrays.asList(createTestData(1), createTestData(2));
+        coll.insertMany(testRecords);
+    }
+
+    @AfterAll
+    static void afterAll() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @BeforeEach
+    void before() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSource() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        List<String> expected =
+                Stream.of(
+                                "+I[1, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]",
+                                "+I[2, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]")
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        assertThat(result).isEqualTo(expected);
+    }
+
+    @Test
+    public void testProject() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT f1, f13 FROM mongo_source").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        List<String> expected =
+                Stream.of("+I[2, +I[13]]", "+I[2, +I[13]]").sorted().collect(Collectors.toList());
+
+        assertThat(result).isEqualTo(expected);
+    }
+
+    @Test
+    public void testLimit() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source LIMIT 1").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        Set<String> expected = new HashSet<>();
+        expected.add(
+                "+I[1, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]");
+        expected.add(
+                "+I[2, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]");
+
+        assertThat(result).hasSize(1);
+        assertThat(result).containsAnyElementsOf(expected);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    public void testLookupJoin(Caching caching) throws Exception {
+        // Create MongoDB lookup table
+        Map<String, String> lookupOptions = new HashMap<>();
+        if (caching.equals(Caching.ENABLE_CACHE)) {
+            lookupOptions.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL");
+            lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "10min");
+            lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "10min");
+            lookupOptions.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "100");
+            lookupOptions.put(LookupOptions.MAX_RETRIES.key(), "10");
+        }
+
+        tEnv.executeSql(createTestDDl(lookupOptions));
+
+        DataStream<Row> sourceStream =
+                env.fromCollection(
+                                Arrays.asList(
+                                        Row.of(1L, "Alice"),
+                                        Row.of(1L, "Alice"),
+                                        Row.of(2L, "Bob"),
+                                        Row.of(3L, "Charlie")))
+                        .returns(
+                                new RowTypeInfo(
+                                        new TypeInformation[] {Types.LONG, Types.STRING},
+                                        new String[] {"id", "name"}));
+
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+
+        tEnv.createTemporaryView("value_source", sourceStream, sourceSchema);
+
+        if (caching == Caching.ENABLE_CACHE) {
+            LookupCacheManager.keepCacheOnRelease(true);
+        }
+
+        // Execute lookup join
+        try (CloseableIterator<Row> iterator =
+                tEnv.executeSql(
+                                "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM value_source"
+                                        + " AS S JOIN mongo_source for system_time as of S.proctime AS D ON S.id = D._id")
+                        .collect()) {
+            List<String> result =
+                    CollectionUtil.iteratorToList(iterator).stream()
+                            .map(Row::toString)
+                            .sorted()
+                            .collect(Collectors.toList());
+            List<String> expected =
+                    Arrays.asList(
+                            "+I[1, Alice, 1, 2, false]",
+                            "+I[1, Alice, 1, 2, false]",
+                            "+I[2, Bob, 2, 2, false]");
+
+            assertThat(result).hasSize(3);
+            assertThat(result).isEqualTo(expected);
+            if (caching == Caching.ENABLE_CACHE) {
+                // Validate cache
+                Map<String, LookupCacheManager.RefCountedCache> managedCaches =
+                        LookupCacheManager.getInstance().getManagedCaches();
+                assertThat(managedCaches).hasSize(1);
+                LookupCache cache =
+                        managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+                validateCachedValues(cache);
+            }
+
+        } finally {
+            if (caching == Caching.ENABLE_CACHE) {
+                LookupCacheManager.getInstance().checkAllReleased();
+                LookupCacheManager.getInstance().clear();
+                LookupCacheManager.keepCacheOnRelease(false);
+            }
+        }
+    }
+
+    private static void validateCachedValues(LookupCache cache) {
+        // mongo does support project push down, the cached row has been projected
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 = GenericRowData.of(1L, StringData.fromString("2"), false);
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 = GenericRowData.of(2L, StringData.fromString("2"), false);
+
+        RowData key3 = GenericRowData.of(3L);
+
+        Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
+        expectedEntries.put(key1, Collections.singletonList(value1));
+        expectedEntries.put(key2, Collections.singletonList(value2));
+        expectedEntries.put(key3, Collections.emptyList());
+
+        LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+    }
+
+    private enum Caching {
+        ENABLE_CACHE,
+        DISABLE_CACHE
+    }
+
+    private static String createTestDDl(Map<String, String> extraOptions) {
+        Map<String, String> options = new HashMap<>();
+        options.put(CONNECTOR.key(), "mongodb");
+        options.put(URI.key(), MONGO_CONTAINER.getConnectionString());
+        options.put(DATABASE.key(), TEST_DATABASE);
+        options.put(COLLECTION.key(), TEST_COLLECTION);
+        if (extraOptions != null) {
+            options.putAll(extraOptions);
+        }
+
+        String optionString =
+                options.entrySet().stream()
+                        .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
+                        .collect(Collectors.joining(",\n"));
+
+        return String.join(
+                "\n",
+                Arrays.asList(
+                        "CREATE TABLE mongo_source",
+                        "(",
+                        "  _id BIGINT,",

Review Comment:
   Maybe I'm laking some MongoDB knowledge here but why do we allow `_id` to be BIGINT? Shouldn't this be always a hexadecimal string. I'm wondering whether we actually support round-trip semantics with this PR. A connector should feel like a database table which means the content of the sink can be re-read by the source ideally with exactly the same schema? Is this currently possible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org