You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/27 10:42:06 UTC
[inlong] branch master updated: [INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d0c0b56e [INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)
4d0c0b56e is described below
commit 4d0c0b56e531fe4268b753f5c49208fe409fdcf1
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Mar 27 18:41:59 2023 +0800
[INLONG-7700][Sort] Update some copy class file when update mongo-cdc version to 2.3 (#7705)
---
.../MongoDBConnectorDeserializationSchema.java | 99 ++++++++++++----------
1 file changed, 55 insertions(+), 44 deletions(-)
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 3ef453545..4e0e09b7e 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.cdc.mongodb.debezium.table;
-import com.google.gson.Gson;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.internal.HexUtils;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
@@ -44,26 +43,26 @@ import org.apache.inlong.sort.cdc.mongodb.table.filter.RowKindValidator;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
-import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonBinarySubType;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
-import org.bson.BsonDouble;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
import org.bson.BsonMaxKey;
import org.bson.BsonMinKey;
import org.bson.BsonRegularExpression;
-import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonUndefined;
import org.bson.BsonValue;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.EncoderContext;
+import org.bson.json.JsonWriter;
import org.bson.types.Decimal128;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
@@ -71,6 +70,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -86,12 +86,14 @@ public class MongoDBConnectorDeserializationSchema
protected static final Logger LOG = LoggerFactory.getLogger(MongoDBConnectorDeserializationSchema.class);
private static final long serialVersionUID = 1750787080613035184L;
- private static Gson gson = new Gson();
-
- /** TypeInformation of the produced {@link RowData}. */
+ /**
+ * TypeInformation of the produced {@link RowData}.
+ */
private final TypeInformation<RowData> resultTypeInfo;
- /** Local Time zone. */
+ /**
+ * Local Time zone.
+ */
private final ZoneId localTimeZone;
/**
@@ -101,7 +103,9 @@ public class MongoDBConnectorDeserializationSchema
*/
protected final DeserializationRuntimeConverter physicalConverter;
- /** Whether the deserializer needs to handle metadata columns. */
+ /**
+ * Whether the deserializer needs to handle metadata columns.
+ */
protected final boolean hasMetadata;
/**
@@ -252,7 +256,7 @@ public class MongoDBConnectorDeserializationSchema
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
- return BsonDocument.parse(value.getString(fieldName));
+ return BsonDocument.parse(docString);
}
}
return null;
@@ -293,12 +297,16 @@ public class MongoDBConnectorDeserializationSchema
Object convert(BsonValue docObj) throws Exception;
}
- /** Creates a runtime converter which is null safe. */
+ /**
+ * Creates a runtime converter which is null safe.
+ */
private DeserializationRuntimeConverter createConverter(LogicalType type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
- /** Creates a runtime converter which assuming input object is not null. */
+ /**
+ * Creates a runtime converter which assuming input object is not null.
+ */
private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
@@ -546,6 +554,10 @@ public class MongoDBConnectorDeserializationSchema
return LocalDateTime.ofInstant(instant, localTimeZone);
}
+ private ZonedDateTime convertInstantToZonedDateTime(Instant instant) {
+ return ZonedDateTime.ofInstant(instant, localTimeZone);
+ }
+
private Instant convertToInstant(BsonTimestamp bsonTimestamp) {
return Instant.ofEpochSecond(bsonTimestamp.getTime());
}
@@ -639,6 +651,10 @@ public class MongoDBConnectorDeserializationSchema
if (docObj.isString()) {
return StringData.fromString(docObj.asString().getValue());
}
+ if (docObj.isDocument()) {
+ // convert document to json string
+ return StringData.fromString(docObj.asDocument().toJson());
+ }
if (docObj.isBinary()) {
BsonBinary bsonBinary = docObj.asBinary();
if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
@@ -667,12 +683,35 @@ public class MongoDBConnectorDeserializationSchema
if (docObj.isDateTime()) {
Instant instant = convertToInstant(docObj.asDateTime());
return StringData.fromString(
- convertInstantToLocalDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+ convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
}
if (docObj.isTimestamp()) {
Instant instant = convertToInstant(docObj.asTimestamp());
return StringData.fromString(
- convertInstantToLocalDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+ convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
+ }
+ if (docObj.isArray()) {
+ // convert bson array to json string
+ Writer writer = new StringWriter();
+ JsonWriter jsonArrayWriter =
+ new JsonWriter(writer) {
+
+ @Override
+ public void writeStartArray() {
+ doWriteStartArray();
+ setState(State.VALUE);
+ }
+
+ @Override
+ public void writeEndArray() {
+ doWriteEndArray();
+ setState(getNextState());
+ }
+ };
+
+ new BsonArrayCodec()
+ .encode(jsonArrayWriter, docObj.asArray(), EncoderContext.builder().build());
+ return StringData.fromString(writer.toString());
}
if (docObj.isRegularExpression()) {
BsonRegularExpression regex = docObj.asRegularExpression();
@@ -694,34 +733,6 @@ public class MongoDBConnectorDeserializationSchema
if (docObj instanceof BsonMinKey || docObj instanceof BsonMaxKey) {
return StringData.fromString(docObj.getBsonType().name());
}
- // TODO unconfirmed feature
- if (docObj instanceof BsonArray) {
- String valueStr = "{";
- for (BsonValue bsonValue : ((BsonArray) docObj).getValues()) {
- switch (bsonValue.getBsonType()) {
- case DOUBLE:
- valueStr = valueStr + (int) ((BsonDouble) bsonValue).getValue() + ",";
- break;
- case INT32:
- valueStr = valueStr + ((BsonInt32) bsonValue).getValue() + ",";
- break;
- case INT64:
- valueStr = valueStr + ((BsonInt64) bsonValue).getValue() + ",";
- break;
- case STRING:
- valueStr = valueStr + ((BsonString) bsonValue).getValue() + ",";
- break;
- default:
- valueStr = valueStr + bsonValue + ",";
- }
- }
- valueStr = valueStr.substring(0, valueStr.length() - 1) + "}";
- return StringData.fromString(valueStr);
- }
- if (docObj.isDocument()) {
- return StringData.fromString(gson.toJson(docObj));
- }
-
throw new IllegalArgumentException(
"Unable to convert to string from unexpected value '"
+ docObj