You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/09 10:56:09 UTC
[incubator-paimon] 07/08: [flink] add spatial type for mysql cdc action (#1026)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit bcfb51384c7c010e1bfd9a3b4b58346799cdb319
Author: JunZhang <zh...@126.com>
AuthorDate: Thu May 4 15:01:57 2023 +0800
[flink] add spatial type for mysql cdc action (#1026)
---
paimon-flink/paimon-flink-common/pom.xml | 18 ++
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 207 ++++++++++++---------
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 15 ++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 38 +++-
.../src/test/resources/mysql/setup.sql | 26 ++-
5 files changed, 212 insertions(+), 92 deletions(-)
diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml
index aabbb2e3d..f850de833 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -37,6 +37,7 @@ under the License.
<flink.version>1.17.0</flink.version>
<flink.cdc.version>2.3.0</flink.cdc.version>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
+ <geometry.version>2.2.0</geometry.version>
</properties>
<dependencies>
@@ -68,6 +69,21 @@ under the License.
<scope>provided</scope>
</dependency>
+ <!-- cdc dependencies start -->
+
+ <dependency>
+ <groupId>com.esri.geometry</groupId>
+ <artifactId>esri-geometry-api</artifactId>
+ <version>${geometry.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
@@ -75,6 +91,8 @@ under the License.
<scope>provided</scope>
</dependency>
+ <!-- cdc dependencies end -->
+
<dependency>
<groupId>com.ververica</groupId>
<artifactId>frocksdbjni</artifactId>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index d48c19301..7bf52e12b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -29,12 +29,15 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import com.esri.core.geometry.ogc.OGCGeometry;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -200,107 +203,135 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
}
private Map<String, String> extractRow(JsonNode recordRow) {
- Map<String, String> recordMap =
- objectMapper.convertValue(recordRow, new TypeReference<Map<String, String>>() {});
- if (recordMap == null) {
+ // the geometry, point type can not be converted to string, so we convert it to Object
+ // first.
+ Map<String, Object> jsonMap =
+ objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+ if (jsonMap == null) {
return new HashMap<>();
}
+ Map<String, String> resultMap = new HashMap<>();
for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
String fieldName = field.getKey();
String mySqlType = field.getValue();
- if (recordMap.containsKey(fieldName)) {
- String className = fieldClassNames.get(fieldName);
- String oldValue = recordMap.get(fieldName);
- String newValue = oldValue;
+ Object objectValue = jsonMap.get(fieldName);
+ if (objectValue == null) {
+ continue;
+ }
- if (newValue == null) {
- continue;
+ String className = fieldClassNames.get(fieldName);
+ String oldValue = objectValue.toString();
+ String newValue = oldValue;
+
+ // pay attention to the temporal types
+ // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
+ if ("bytes".equals(mySqlType) && className == null) {
+ // MySQL binary, varbinary, blob
+ newValue = new String(Base64.getDecoder().decode(oldValue));
+ } else if ("bytes".equals(mySqlType)
+ && "org.apache.kafka.connect.data.Decimal".equals(className)) {
+ // MySQL numeric, fixed, decimal
+ try {
+ new BigDecimal(oldValue);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid big decimal value "
+ + oldValue
+ + ". Make sure that in the `customConverterConfigs` "
+ + "of the JsonDebeziumDeserializationSchema you created, set '"
+ + JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ + "' to 'numeric'",
+ e);
}
-
- // pay attention to the temporal types
- // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
- if ("bytes".equals(mySqlType) && className == null) {
- // MySQL binary, varbinary, blob
- newValue = new String(Base64.getDecoder().decode(oldValue));
- } else if ("bytes".equals(mySqlType)
- && "org.apache.kafka.connect.data.Decimal".equals(className)) {
- // MySQL numeric, fixed, decimal
- try {
- new BigDecimal(oldValue);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(
- "Invalid big decimal value "
- + oldValue
- + ". Make sure that in the `customConverterConfigs` "
- + "of the JsonDebeziumDeserializationSchema you created, set '"
- + JsonConverterConfig.DECIMAL_FORMAT_CONFIG
- + "' to 'numeric'",
- e);
+ } else if ("io.debezium.time.Date".equals(className)) {
+ // MySQL date
+ newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
+ } else if ("io.debezium.time.Timestamp".equals(className)) {
+ // MySQL datetime (precision 0-3)
+
+ // display value of datetime is not affected by timezone, see
+ // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+ // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+ // for implementation
+ LocalDateTime localDateTime =
+ Instant.ofEpochMilli(Long.parseLong(oldValue))
+ .atZone(ZoneOffset.UTC)
+ .toLocalDateTime();
+ newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
+ } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
+ // MySQL datetime (precision 4-6)
+ long microseconds = Long.parseLong(oldValue);
+ long microsecondsPerSecond = 1_000_000;
+ long nanosecondsPerMicros = 1_000;
+ long seconds = microseconds / microsecondsPerSecond;
+ long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
+
+ // display value of datetime is not affected by timezone, see
+ // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+ // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+ // for implementation
+ LocalDateTime localDateTime =
+ Instant.ofEpochSecond(seconds, nanoAdjustment)
+ .atZone(ZoneOffset.UTC)
+ .toLocalDateTime();
+ newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
+ } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
+ // MySQL timestamp
+
+ // dispaly value of timestamp is affected by timezone, see
+ // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+ // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+ // for implementation
+ LocalDateTime localDateTime =
+ Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
+ newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
+ } else if ("io.debezium.time.MicroTime".equals(className)) {
+ long microseconds = Long.parseLong(oldValue);
+ long microsecondsPerSecond = 1_000_000;
+ long nanosecondsPerMicros = 1_000;
+ long seconds = microseconds / microsecondsPerSecond;
+ long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
+
+ newValue =
+ Instant.ofEpochSecond(seconds, nanoAdjustment)
+ .atZone(ZoneOffset.UTC)
+ .toLocalTime()
+ .toString();
+ } else if ("io.debezium.data.geometry.Point".equals(className)
+ || "io.debezium.data.geometry.Geometry".equals(className)) {
+ JsonNode jsonNode = recordRow.get(fieldName);
+ try {
+ byte[] wkb = jsonNode.get("wkb").binaryValue();
+ String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
+ JsonNode originGeoNode = objectMapper.readTree(geoJson);
+
+ Optional<Integer> srid =
+ Optional.ofNullable(
+ originGeoNode.has("srid")
+ ? originGeoNode.get("srid").intValue()
+ : null);
+ Map<String, Object> geometryInfo = new HashMap<>();
+ String geometryType = originGeoNode.get("type").asText();
+ geometryInfo.put("type", geometryType);
+ if (geometryType.equalsIgnoreCase("GeometryCollection")) {
+ geometryInfo.put("geometries", originGeoNode.get("geometries"));
+ } else {
+ geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
}
- } else if ("io.debezium.time.Date".equals(className)) {
- // MySQL date
- newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
- } else if ("io.debezium.time.Timestamp".equals(className)) {
- // MySQL datetime (precision 0-3)
-
- // display value of datetime is not affected by timezone, see
- // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
- // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
- // for implementation
- LocalDateTime localDateTime =
- Instant.ofEpochMilli(Long.parseLong(oldValue))
- .atZone(ZoneOffset.UTC)
- .toLocalDateTime();
- newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
- } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
- // MySQL datetime (precision 4-6)
- long microseconds = Long.parseLong(oldValue);
- long microsecondsPerSecond = 1_000_000;
- long nanosecondsPerMicros = 1_000;
- long seconds = microseconds / microsecondsPerSecond;
- long nanoAdjustment =
- (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
-
- // display value of datetime is not affected by timezone, see
- // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
- // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
- // for implementation
- LocalDateTime localDateTime =
- Instant.ofEpochSecond(seconds, nanoAdjustment)
- .atZone(ZoneOffset.UTC)
- .toLocalDateTime();
- newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
- } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
- // MySQL timestamp
-
- // dispaly value of timestamp is affected by timezone, see
- // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
- // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
- // for implementation
- LocalDateTime localDateTime =
- Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
- newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
- } else if ("io.debezium.time.MicroTime".equals(className)) {
- long microseconds = Long.parseLong(oldValue);
- long microsecondsPerSecond = 1_000_000;
- long nanosecondsPerMicros = 1_000;
- long seconds = microseconds / microsecondsPerSecond;
- long nanoAdjustment =
- (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
-
- newValue =
- Instant.ofEpochSecond(seconds, nanoAdjustment)
- .atZone(ZoneOffset.UTC)
- .toLocalTime()
- .toString();
+ geometryInfo.put("srid", srid.orElse(0));
+ ObjectWriter objectWriter = objectMapper.writer();
+ newValue = objectWriter.writeValueAsString(geometryInfo);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to convert %s to geometry JSON.", jsonNode), e);
}
-
- recordMap.put(fieldName, newValue);
}
+
+ resultMap.put(fieldName, newValue);
}
- return recordMap;
+ return resultMap;
}
private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 1a014aeb7..48869c24a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -98,6 +98,13 @@ public class MySqlTypeUtils {
private static final String SET = "SET";
private static final String ENUM = "ENUM";
private static final String GEOMETRY = "GEOMETRY";
+ private static final String POINT = "POINT";
+ private static final String LINESTRING = "LINESTRING";
+ private static final String POLYGON = "POLYGON";
+ private static final String MULTIPOINT = "MULTIPOINT";
+ private static final String MULTILINESTRING = "MULTILINESTRING";
+ private static final String MULTIPOLYGON = "MULTIPOLYGON";
+ private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION";
private static final String UNKNOWN = "UNKNOWN";
// This length is from JDBC.
@@ -201,6 +208,14 @@ public class MySqlTypeUtils {
case LONGTEXT:
case JSON:
case ENUM:
+ case GEOMETRY:
+ case POINT:
+ case LINESTRING:
+ case POLYGON:
+ case MULTIPOINT:
+ case MULTILINESTRING:
+ case MULTIPOLYGON:
+ case GEOMETRYCOLLECTION:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(Preconditions.checkNotNull(length));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 701390950..d3e950d58 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -464,7 +464,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
DataTypes.STRING(), // _json
DataTypes.STRING(), // _enum
DataTypes.INT(), // _year
- DataTypes.TIME() // _time
+ DataTypes.TIME(), // _time
+ DataTypes.STRING(), // _point
+ DataTypes.STRING(), // _geometry
+ DataTypes.STRING(), // _linestring
+ DataTypes.STRING(), // _polygon
+ DataTypes.STRING(), // _multipoint
+ DataTypes.STRING(), // _multiline
+ DataTypes.STRING(), // _multipolygon
+ DataTypes.STRING() // _geometrycollection
},
new String[] {
"_id",
@@ -532,7 +540,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
"_json",
"_enum",
"_year",
- "_time"
+ "_time",
+ "_point",
+ "_geometry",
+ "_linestring",
+ "_polygon",
+ "_multipoint",
+ "_multiline",
+ "_multipolygon",
+ "_geometrycollection",
});
FileStoreTable table = getFileStoreTable();
List<String> expected =
@@ -570,7 +586,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
+ "{\"a\": \"b\"}, "
+ "value1, "
+ "2023, "
- + "36803000"
+ + "36803000, "
+ + "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, "
+ + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, "
+ + "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, "
+ + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, "
+ + "{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
+ + "{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, "
+ + "{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, "
+ + "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"
+ "]",
"+I["
+ "2, 2.2, "
@@ -595,6 +619,14 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
+ "NULL, "
+ "NULL, "
+ "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ "NULL"
+ "]");
waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index d18ccef31..580e13e3f 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -137,6 +137,14 @@ CREATE TABLE all_types_table (
-- YEAR
_year YEAR,
_time TIME,
+ _point POINT,
+ _geometry GEOMETRY,
+ _linestring LINESTRING,
+ _polygon POLYGON,
+ _multipoint MULTIPOINT,
+ _multiline MULTILINESTRING,
+ _multipolygon MULTIPOLYGON,
+ _geometrycollection GEOMETRYCOLLECTION,
PRIMARY KEY (_id)
);
@@ -186,7 +194,15 @@ INSERT INTO all_types_table VALUES (
-- YEAR
2023,
-- TIME,
- '10:13:23'
+ '10:13:23',
+ ST_GeomFromText('POINT(1 1)'),
+ ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
+ ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),
+ ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
+ ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
+ ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
+ ST_GeomFromText('MULTIPOLYGON(((0 0,10 0,10 10,0 10,0 0)),((5 5,7 5,7 7,5 7, 5 5)))'),
+ ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')
), (
2, 2.2,
NULL, NULL, NULL, NULL, NULL, NULL,
@@ -210,6 +226,14 @@ INSERT INTO all_types_table VALUES (
NULL,
NULL,
NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
NULL
);