You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/04/29 07:17:42 UTC
[incubator-inlong] branch master updated: [INLONG-3996][Sort] Sort support all migrate for database
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 30d6c6445 [INLONG-3996][Sort] Sort support all migrate for database
30d6c6445 is described below
commit 30d6c6445746b15fd11abd1865f90c5458f39edb
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Apr 29 15:17:36 2022 +0800
[INLONG-3996][Sort] Sort support all migrate for database
---
.../sort/protocol/node/format/CsvFormat.java | 12 +-
inlong-sort/sort-formats/format-json/pom.xml | 4 +
.../inlong/sort/formats/json/canal/CanalJson.java | 115 ++++++++++++++++
.../debezium/table/AppendMetadataCollector.java | 10 +-
.../table/RowDataDebeziumDeserializeSchema.java | 148 ++++++++++++++++++---
.../mysql/source/config/MySqlSourceOptions.java | 6 +
.../cdc/mysql/table/MySqlReadableMetadata.java | 148 +++++++++++++++------
.../mysql/table/MySqlTableInlongSourceFactory.java | 6 +-
.../flink/cdc/mysql/table/MySqlTableSource.java | 16 ++-
.../singletenant/flink/parser/AllMigrateTest.java | 109 +++++++++++++++
.../flink/parser/FlinkSqlParserTest.java | 50 +++----
11 files changed, 522 insertions(+), 102 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
index 232bda60c..60cfba67b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
@@ -40,7 +40,7 @@ public class CsvFormat implements Format {
@JsonProperty(value = "fieldDelimiter", defaultValue = ",")
private String fieldDelimiter;
- @JsonProperty(value = "disableQuoteCharacter", defaultValue = "false")
+ @JsonProperty(value = "disableQuoteCharacter", defaultValue = "true")
private Boolean disableQuoteCharacter;
@JsonProperty(value = "quoteCharacter", defaultValue = "\"")
private String quoteCharacter;
@@ -76,7 +76,7 @@ public class CsvFormat implements Format {
@JsonCreator
public CsvFormat() {
- this(",", false, "\"", false, true, ";", null, null);
+ this(",", true, null, false, true, ";", null, null);
}
@JsonIgnore
@@ -90,14 +90,16 @@ public class CsvFormat implements Format {
*
* @return options
*/
+ @Override
public Map<String, String> generateOptions() {
Map<String, String> options = new HashMap<>(16);
options.put("format", getFormat());
options.put("csv.field-delimiter", this.fieldDelimiter);
- if (this.disableQuoteCharacter != null) {
- options.put("csv.disable-quote-character", this.disableQuoteCharacter.toString());
+ options.put("csv.disable-quote-character", this.disableQuoteCharacter.toString());
+ // disable quote and quote character cannot appear at the same time
+ if (!this.disableQuoteCharacter) {
+ options.put("csv.quote-character", this.quoteCharacter);
}
- options.put("csv.quote-character", this.quoteCharacter);
if (this.allowComments != null) {
options.put("csv.allow-comments", this.allowComments.toString());
}
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
index ad346cabb..9a0846c96 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -72,6 +72,10 @@
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
</dependencies>
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
new file mode 100644
index 000000000..8ea272436
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+
+@Builder
+public class CanalJson {
+
+ private List<Map<String, Object>> data;
+ private long es;
+ private String table;
+ private String type;
+ private String database;
+ private long ts;
+ private String sql;
+ private Map<String, String> mysqlType;
+ private boolean isDdl;
+ private List<String> pkNames;
+
+ public List<Map<String, Object>> getData() {
+ return data;
+ }
+
+ public void setData(List<Map<String, Object>> data) {
+ this.data = data;
+ }
+
+ public long getEs() {
+ return es;
+ }
+
+ public void setEs(long es) {
+ this.es = es;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public long getTs() {
+ return ts;
+ }
+
+ public void setTs(long ts) {
+ this.ts = ts;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
+ public Map<String, String> getMysqlType() {
+ return mysqlType;
+ }
+
+ public void setMysqlType(Map<String, String> mysqlType) {
+ this.mysqlType = mysqlType;
+ }
+
+ public void setDdl(boolean ddl) {
+ isDdl = ddl;
+ }
+
+ public List<String> getPkNames() {
+ return pkNames;
+ }
+
+ public void setPkNames(List<String> pkNames) {
+ this.pkNames = pkNames;
+ }
+
+}
+
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
index 6e83a8358..005254f6c 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -39,17 +39,23 @@ public final class AppendMetadataCollector implements Collector<RowData>, Serial
public transient SourceRecord inputRecord;
public transient Collector<RowData> outputCollector;
+ private boolean migrateAll;
- public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+ public AppendMetadataCollector(MetadataConverter[] metadataConverters, boolean migrateAll) {
this.metadataConverters = metadataConverters;
+ this.migrateAll = migrateAll;
}
public void collect(RowData physicalRow, TableChange tableSchema) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
- Object meta = metadataConverters[i].read(inputRecord, tableSchema);
+ Object meta = metadataConverters[i].read(inputRecord, tableSchema, physicalRow);
metaRow.setField(i, meta);
}
+ if (migrateAll) {
+ // all data are put into meta row, set physicalRow to empty
+ physicalRow = new GenericRowData(0);
+ }
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
outputCollector.collect(outRow);
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 217b4264c..4065c814e 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -24,17 +24,25 @@ import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTimestamp;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
@@ -49,11 +57,15 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.TemporalConversions;
+import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
@@ -62,7 +74,17 @@ import org.apache.kafka.connect.source.SourceRecord;
public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeserializationSchema<RowData> {
+ private static final Logger LOG = LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+
private static final long serialVersionUID = 2L;
+
+ private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+
+ private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+
+ private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern(
+ "yyyy-MM-dd HH:mm:ss");
+
/**
* TypeInformation of the produced {@link RowData}. *
*/
@@ -89,6 +111,10 @@ public final class RowDataDebeziumDeserializeSchema
*/
private final ValueValidator validator;
+ private boolean migrateAll;
+
+ private ZoneId serverTimeZone;
+
RowDataDebeziumDeserializeSchema(
RowType physicalDataType,
MetadataConverter[] metadataConverters,
@@ -96,9 +122,12 @@ public final class RowDataDebeziumDeserializeSchema
ValueValidator validator,
ZoneId serverTimeZone,
boolean appendSource,
- DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+ boolean migrateAll) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
- this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
+ this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters, migrateAll);
+ this.migrateAll = migrateAll;
+ this.serverTimeZone = serverTimeZone;
this.physicalConverter =
createConverter(
checkNotNull(physicalDataType),
@@ -119,7 +148,7 @@ public final class RowDataDebeziumDeserializeSchema
/**
* Creates a runtime converter which is null safe.
*/
- private static DeserializationRuntimeConverter createConverter(
+ private DeserializationRuntimeConverter createConverter(
LogicalType type,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -130,7 +159,7 @@ public final class RowDataDebeziumDeserializeSchema
/**
* Creates a runtime converter which assuming input object is not null.
*/
- public static DeserializationRuntimeConverter createNotNullConverter(
+ public DeserializationRuntimeConverter createNotNullConverter(
LogicalType type,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -474,7 +503,7 @@ public final class RowDataDebeziumDeserializeSchema
};
}
- private static DeserializationRuntimeConverter createRowConverter(
+ private DeserializationRuntimeConverter createRowConverter(
RowType rowType,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -490,33 +519,104 @@ public final class RowDataDebeziumDeserializeSchema
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+ if (!migrateAll) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ Struct struct = (Struct) dbzObj;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object convertedField =
+ convertField(fieldConverters[i], fieldValue, fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ return row;
+ }
+ };
+ } else {
+ return getAllMigrationConverter();
+ }
+ }
+
+ @NotNull
+ private DeserializationRuntimeConverter getAllMigrationConverter() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Object dbzObj, Schema schema) throws Exception {
+ public Object convert(Object dbzObj, Schema schema) {
+
+ ConnectSchema connectSchema = (ConnectSchema) schema;
+ List<Field> fields = connectSchema.fields();
+
+ Map<String, Object> data = new HashMap<>();
Struct struct = (Struct) dbzObj;
- int arity = fieldNames.length;
- GenericRowData row = new GenericRowData(arity);
- for (int i = 0; i < arity; i++) {
- String fieldName = fieldNames[i];
- Field field = schema.field(fieldName);
- if (field == null) {
- row.setField(i, null);
- } else {
- Object fieldValue = struct.getWithoutDefault(fieldName);
- Schema fieldSchema = schema.field(fieldName).schema();
- Object convertedField =
- convertField(fieldConverters[i], fieldValue, fieldSchema);
- row.setField(i, convertedField);
+
+ for (Field field : fields) {
+ String fieldName = field.name();
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ String schemaName = fieldSchema.name();
+ if (schemaName != null) {
+ // normal type doesn't have schema name
+ // schema names are time schemas
+ fieldValue = getTimeValue(fieldValue, schemaName);
}
+ data.put(fieldName, fieldValue);
}
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, data);
+
return row;
}
};
}
+ /**
+ * transform debezium time format to database format
+ * @param fieldValue
+ * @param schemaName
+ * @return
+ */
+ private Object getTimeValue(Object fieldValue, String schemaName) {
+ switch (schemaName) {
+ case MicroTime.SCHEMA_NAME:
+ Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
+ fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant,serverTimeZone));
+ break;
+ case Date.SCHEMA_NAME:
+ fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
+ break;
+ case ZonedTimestamp.SCHEMA_NAME:
+ ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
+ fieldValue = timestampFormatter.format(zonedDateTime
+ .withZoneSameInstant(serverTimeZone).toLocalDateTime());
+ break;
+ case Timestamp.SCHEMA_NAME:
+ Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
+ fieldValue = timestampFormatter.format(LocalDateTime.ofInstant(instantTime,
+ serverTimeZone));
+ break;
+ default:
+ LOG.error("parse schema {} error", schemaName);
+ }
+ return fieldValue;
+ }
+
private static Object convertField(
DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
throws Exception {
@@ -548,6 +648,7 @@ public final class RowDataDebeziumDeserializeSchema
deserialize(record, out, null);
}
+ @Override
public void deserialize(SourceRecord record, Collector<RowData> out,
TableChange tableSchema)
throws Exception {
@@ -632,6 +733,7 @@ public final class RowDataDebeziumDeserializeSchema
};
private ZoneId serverTimeZone = ZoneId.of("UTC");
private boolean appendSource = false;
+ private boolean migrateAll = false;
private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
DeserializationRuntimeConverterFactory.DEFAULT;
@@ -640,6 +742,11 @@ public final class RowDataDebeziumDeserializeSchema
return this;
}
+ public Builder setMigrateAll(boolean migrateAll) {
+ this.migrateAll = migrateAll;
+ return this;
+ }
+
public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
return this;
@@ -679,7 +786,8 @@ public final class RowDataDebeziumDeserializeSchema
validator,
serverTimeZone,
appendSource,
- userDefinedConverterFactory);
+ userDefinedConverterFactory,
+ migrateAll);
}
}
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
index 8cc49ff93..c1011aa5f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -189,6 +189,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription("Whether works as append source.");
+ public static final ConfigOption<Boolean> MIGRATE_ALL =
+ ConfigOptions.key("migrate-all")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether migrate all databases");
+
// ----------------------------------------------------------------------------
// experimental options, won't add them to documentation
// ----------------------------------------------------------------------------
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
index 37cf1bd07..5e4d4bf3d 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
@@ -20,18 +20,24 @@ package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -51,10 +57,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
- return StringData.fromString(
- sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+ return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
@@ -69,10 +72,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
- return StringData.fromString(
- sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+ return StringData.fromString(getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY));
}
}),
@@ -96,26 +96,55 @@ public enum MySqlReadableMetadata {
}),
DATA(
- "meta.data",
- DataTypes.BIGINT(),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object read(SourceRecord record) {
- record.value().toString();
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
- return TimestampData.fromEpochMillis(
- (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
- }
-
- @Override
- public Object read(SourceRecord record,
- @org.jetbrains.annotations.Nullable TableChanges.TableChange tableSchema, RowData rowData) {
- return rowData.getLong(0);
+ "meta.data",
+ DataTypes.STRING(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ record.value().toString();
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.TIMESTAMP);
+ sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+ return TimestampData.fromEpochMillis(
+ (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+ }
+
+ @Override
+ public Object read(SourceRecord record,
+ @org.jetbrains.annotations.Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+ // construct canal json
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ // tableName
+ String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
+ // databaseName
+ String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
+ // opTs
+ long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+ // ts
+ long ts = (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP);
+ // actual data
+ GenericRowData data = (GenericRowData) rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+ List<Map<String, Object>> dataList = new ArrayList<>();
+ dataList.add(field);
+
+ CanalJson canalJson = CanalJson.builder()
+ .data(dataList).database(databaseName)
+ .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+ .type(getOpType(record)).build();
+
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return StringData.fromString(objectMapper.writeValueAsString(canalJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta data", e);
}
- }),
+ }
+ }),
/**
* Name of the table that contain the row. .
@@ -128,10 +157,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
- return StringData.fromString(
- sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+ return getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
}
}),
@@ -146,10 +172,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
- return StringData.fromString(
- sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+ return getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
}
}),
@@ -183,14 +206,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- final Envelope.Operation op = Envelope.operationFor(record);
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- return StringData.fromString("INSERT");
- } else if (op == Envelope.Operation.DELETE) {
- return StringData.fromString("DELETE");
- } else {
- return StringData.fromString("UPDATE");
- }
+ return StringData.fromString(getOpType(record));
}
}),
@@ -362,10 +378,54 @@ public enum MySqlReadableMetadata {
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
return TimestampData.fromEpochMillis(
- (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
+ (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
}
});
+ private static String getOpType(SourceRecord record) {
+ String opType;
+ final Envelope.Operation op = Envelope.operationFor(record);
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ opType = "INSERT";
+ } else if (op == Envelope.Operation.DELETE) {
+ opType = "DELETE";
+ } else {
+ opType = "UPDATE";
+ }
+ return opType;
+ }
+
+ private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ return tableSchema.getTable().primaryKeyColumnNames();
+ }
+
+ public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<String, String> mysqlType = new HashMap<>();
+ final Table table = tableSchema.getTable();
+ table.columns()
+ .forEach(
+ column -> {
+ mysqlType.put(
+ column.name(),
+ String.format(
+ "%s(%d)",
+ column.typeName(),
+ column.length()));
+ });
+ return mysqlType;
+ }
+
+ private static String getMetaData(SourceRecord record, String tableNameKey) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ return sourceStruct.getString(tableNameKey);
+ }
private final String key;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index a7a7bb2eb..9d4382e29 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -28,6 +28,7 @@ import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.PORT;
import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@@ -137,6 +138,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
final boolean appendSource = config.get(APPEND_MODE);
+ final boolean migrateAll = config.get(MIGRATE_ALL);
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
@@ -179,7 +181,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
startupOptions,
scanNewlyAddedTableEnabled,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
- heartbeatInterval);
+ heartbeatInterval,
+ migrateAll);
}
@Override
@@ -218,6 +221,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(APPEND_MODE);
+ options.add(MIGRATE_ALL);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(HEARTBEAT_INTERVAL);
return options;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
index 9c1c6baff..792dc304a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -79,7 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
-
+ private final boolean migrateAll;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
@@ -116,7 +116,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
double distributionFactorLower,
boolean appendSource,
StartupOptions startupOptions,
- Duration heartbeatInterval) {
+ Duration heartbeatInterval,
+ boolean migrateAll) {
this(
physicalSchema,
port,
@@ -141,7 +142,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
startupOptions,
false,
new Properties(),
- heartbeatInterval);
+ heartbeatInterval,
+ migrateAll);
}
public MySqlTableSource(
@@ -168,7 +170,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties,
- Duration heartbeatInterval) {
+ Duration heartbeatInterval,
+ boolean migrateAll) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -196,6 +199,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.heartbeatInterval = heartbeatInterval;
+ this.migrateAll = migrateAll;
}
@Override
@@ -227,6 +231,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.setAppendSource(appendSource)
.setUserDefinedConverterFactory(
MySqlDeserializationConverterFactory.instance())
+ .setMigrateAll(migrateAll)
.build();
if (enableParallelRead) {
MySqlSource<RowData> parallelSource =
@@ -337,7 +342,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
startupOptions,
scanNewlyAddedTableEnabled,
jdbcProperties,
- heartbeatInterval);
+ heartbeatInterval,
+ migrateAll);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java
new file mode 100644
index 000000000..7347b2078
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import static org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField.MYSQL_METADATA_DATA;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigrateTest {
+
+ private MySqlExtractNode buildAllMigrateExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(
+ new BuiltInFieldInfo("data", new StringFormatInfo(), MYSQL_METADATA_DATA));
+ Map<String, String> option = new HashMap<>();
+ option.put("append-mode", "true");
+ option.put("migrate-all", "true");
+ MySqlExtractNode node = new MySqlExtractNode("1", "mysql_input", fields,
+ null, option, null,
+ Arrays.asList("[\\s\\S]*.*"), "localhost", "root", "password",
+ "[\\s\\S]*.*", null, null, false, null);
+ return node;
+ }
+
+ private KafkaLoadNode buildAllMigrateKafkaNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("data", new StringFormatInfo()));
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("data", new StringFormatInfo()),
+ new FieldInfo("data", new StringFormatInfo())));
+ CsvFormat csvFormat = new CsvFormat();
+ csvFormat.setDisableQuoteCharacter(true);
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ "topic", "localhost:9092",
+ csvFormat, null,
+ null, null);
+ }
+
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql parse
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testAllMigrate() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildAllMigrateExtractNode();
+ Node outputNode = buildAllMigrateKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ FlinkSqlParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index f97367eb4..14ca70205 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -52,37 +52,37 @@ public class FlinkSqlParserTest extends AbstractTestBase {
private MySqlExtractNode buildMySQLExtractNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("salary", new FloatFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()));
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
return new MySqlExtractNode("1", "mysql_input", fields,
- null, null, "id",
- Collections.singletonList("test"), "localhost", "username", "username",
- "test_database", null, null,
- null, null);
+ null, null, "id",
+ Collections.singletonList("test"), "localhost", "username", "username",
+ "test_database", null, null,
+ null, null);
}
private KafkaLoadNode buildKafkaNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("salary", new FloatFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()));
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()))
- );
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()))
+ );
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
- "topic", "localhost:9092",
- new CanalJsonFormat(), null,
- null, null);
+ "topic", "localhost:9092",
+ new CanalJsonFormat(), null,
+ null, null);
}
private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -120,7 +120,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
* @throws Exception The exception may throws when execute the case
*/
@Test
- public void testMysqlToHive() throws Exception {
+ public void testMysqlToHive() {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()