You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/27 10:42:06 UTC

[inlong] branch master updated: [INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d0c0b56e [INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)
4d0c0b56e is described below

commit 4d0c0b56e531fe4268b753f5c49208fe409fdcf1
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Mar 27 18:41:59 2023 +0800

    [INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)
---
 .../MongoDBConnectorDeserializationSchema.java     | 99 ++++++++++++----------
 1 file changed, 55 insertions(+), 44 deletions(-)

diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 3ef453545..4e0e09b7e 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.cdc.mongodb.debezium.table;
 
-import com.google.gson.Gson;
 import com.mongodb.client.model.changestream.OperationType;
 import com.mongodb.internal.HexUtils;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
@@ -44,26 +43,26 @@ import org.apache.inlong.sort.cdc.mongodb.table.filter.RowKindValidator;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
-import org.bson.BsonArray;
 import org.bson.BsonBinary;
 import org.bson.BsonBinarySubType;
 import org.bson.BsonDateTime;
 import org.bson.BsonDocument;
-import org.bson.BsonDouble;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
 import org.bson.BsonMaxKey;
 import org.bson.BsonMinKey;
 import org.bson.BsonRegularExpression;
-import org.bson.BsonString;
 import org.bson.BsonTimestamp;
 import org.bson.BsonUndefined;
 import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
 import org.bson.types.Decimal128;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.time.Instant;
@@ -71,6 +70,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,12 +86,14 @@ public class MongoDBConnectorDeserializationSchema
     protected static final Logger LOG = LoggerFactory.getLogger(MongoDBConnectorDeserializationSchema.class);
     private static final long serialVersionUID = 1750787080613035184L;
 
-    private static Gson gson = new Gson();
-
-    /** TypeInformation of the produced {@link RowData}. */
+    /**
+     * TypeInformation of the produced {@link RowData}.
+     */
     private final TypeInformation<RowData> resultTypeInfo;
 
-    /** Local Time zone. */
+    /**
+     * Local Time zone.
+     */
     private final ZoneId localTimeZone;
 
     /**
@@ -101,7 +103,9 @@ public class MongoDBConnectorDeserializationSchema
      */
     protected final DeserializationRuntimeConverter physicalConverter;
 
-    /** Whether the deserializer needs to handle metadata columns. */
+    /**
+     * Whether the deserializer needs to handle metadata columns.
+     */
     protected final boolean hasMetadata;
 
     /**
@@ -252,7 +256,7 @@ public class MongoDBConnectorDeserializationSchema
         if (valueSchema.field(fieldName) != null) {
             String docString = value.getString(fieldName);
             if (docString != null) {
-                return BsonDocument.parse(value.getString(fieldName));
+                return BsonDocument.parse(docString);
             }
         }
         return null;
@@ -293,12 +297,16 @@ public class MongoDBConnectorDeserializationSchema
         Object convert(BsonValue docObj) throws Exception;
     }
 
-    /** Creates a runtime converter which is null safe. */
+    /**
+     * Creates a runtime converter which is null safe.
+     */
     private DeserializationRuntimeConverter createConverter(LogicalType type) {
         return wrapIntoNullableConverter(createNotNullConverter(type));
     }
 
-    /** Creates a runtime converter which assuming input object is not null. */
+    /**
+     * Creates a runtime converter which assuming input object is not null.
+     */
     private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
         switch (type.getTypeRoot()) {
             case NULL:
@@ -546,6 +554,10 @@ public class MongoDBConnectorDeserializationSchema
         return LocalDateTime.ofInstant(instant, localTimeZone);
     }
 
+    private ZonedDateTime convertInstantToZonedDateTime(Instant instant) {
+        return ZonedDateTime.ofInstant(instant, localTimeZone);
+    }
+
     private Instant convertToInstant(BsonTimestamp bsonTimestamp) {
         return Instant.ofEpochSecond(bsonTimestamp.getTime());
     }
@@ -639,6 +651,10 @@ public class MongoDBConnectorDeserializationSchema
         if (docObj.isString()) {
             return StringData.fromString(docObj.asString().getValue());
         }
+        if (docObj.isDocument()) {
+            // convert document to json string
+            return StringData.fromString(docObj.asDocument().toJson());
+        }
         if (docObj.isBinary()) {
             BsonBinary bsonBinary = docObj.asBinary();
             if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
@@ -667,12 +683,35 @@ public class MongoDBConnectorDeserializationSchema
         if (docObj.isDateTime()) {
             Instant instant = convertToInstant(docObj.asDateTime());
             return StringData.fromString(
-                    convertInstantToLocalDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+                    convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
         }
         if (docObj.isTimestamp()) {
             Instant instant = convertToInstant(docObj.asTimestamp());
             return StringData.fromString(
-                    convertInstantToLocalDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+                    convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+        }
+        if (docObj.isArray()) {
+            // convert bson array to json string
+            Writer writer = new StringWriter();
+            JsonWriter jsonArrayWriter =
+                    new JsonWriter(writer) {
+
+                        @Override
+                        public void writeStartArray() {
+                            doWriteStartArray();
+                            setState(State.VALUE);
+                        }
+
+                        @Override
+                        public void writeEndArray() {
+                            doWriteEndArray();
+                            setState(getNextState());
+                        }
+                    };
+
+            new BsonArrayCodec()
+                    .encode(jsonArrayWriter, docObj.asArray(), EncoderContext.builder().build());
+            return StringData.fromString(writer.toString());
         }
         if (docObj.isRegularExpression()) {
             BsonRegularExpression regex = docObj.asRegularExpression();
@@ -694,34 +733,6 @@ public class MongoDBConnectorDeserializationSchema
         if (docObj instanceof BsonMinKey || docObj instanceof BsonMaxKey) {
             return StringData.fromString(docObj.getBsonType().name());
         }
-        // TODO unconfirmed feature
-        if (docObj instanceof BsonArray) {
-            String valueStr = "{";
-            for (BsonValue bsonValue : ((BsonArray) docObj).getValues()) {
-                switch (bsonValue.getBsonType()) {
-                    case DOUBLE:
-                        valueStr = valueStr + (int) ((BsonDouble) bsonValue).getValue() + ",";
-                        break;
-                    case INT32:
-                        valueStr = valueStr + ((BsonInt32) bsonValue).getValue() + ",";
-                        break;
-                    case INT64:
-                        valueStr = valueStr + ((BsonInt64) bsonValue).getValue() + ",";
-                        break;
-                    case STRING:
-                        valueStr = valueStr + ((BsonString) bsonValue).getValue() + ",";
-                        break;
-                    default:
-                        valueStr = valueStr + bsonValue + ",";
-                }
-            }
-            valueStr = valueStr.substring(0, valueStr.length() - 1) + "}";
-            return StringData.fromString(valueStr);
-        }
-        if (docObj.isDocument()) {
-            return StringData.fromString(gson.toJson(docObj));
-        }
-
         throw new IllegalArgumentException(
                 "Unable to convert to string from unexpected value '"
                         + docObj