You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/09 10:56:09 UTC

[incubator-paimon] 07/08: [flink] add spatial type for mysql cdc action (#1026)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit bcfb51384c7c010e1bfd9a3b4b58346799cdb319
Author: JunZhang <zh...@126.com>
AuthorDate: Thu May 4 15:01:57 2023 +0800

    [flink] add spatial type for mysql cdc action (#1026)
---
 paimon-flink/paimon-flink-common/pom.xml           |  18 ++
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 207 ++++++++++++---------
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  15 ++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  38 +++-
 .../src/test/resources/mysql/setup.sql             |  26 ++-
 5 files changed, 212 insertions(+), 92 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml
index aabbb2e3d..f850de833 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -37,6 +37,7 @@ under the License.
         <flink.version>1.17.0</flink.version>
         <flink.cdc.version>2.3.0</flink.cdc.version>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
+        <geometry.version>2.2.0</geometry.version>
     </properties>
 
     <dependencies>
@@ -68,6 +69,21 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <!-- cdc dependencies start -->
+
+        <dependency>
+            <groupId>com.esri.geometry</groupId>
+            <artifactId>esri-geometry-api</artifactId>
+            <version>${geometry.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-mysql-cdc</artifactId>
@@ -75,6 +91,8 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <!-- cdc dependencies end -->
+
         <dependency>
             <groupId>com.ververica</groupId>
             <artifactId>frocksdbjni</artifactId>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index d48c19301..7bf52e12b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -29,12 +29,15 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
+import com.esri.core.geometry.ogc.OGCGeometry;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -200,107 +203,135 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
     }
 
     private Map<String, String> extractRow(JsonNode recordRow) {
-        Map<String, String> recordMap =
-                objectMapper.convertValue(recordRow, new TypeReference<Map<String, String>>() {});
-        if (recordMap == null) {
+        // the geometry, point type can not be converted to string, so we convert it to Object
+        // first.
+        Map<String, Object> jsonMap =
+                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+        if (jsonMap == null) {
             return new HashMap<>();
         }
 
+        Map<String, String> resultMap = new HashMap<>();
         for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
             String fieldName = field.getKey();
             String mySqlType = field.getValue();
-            if (recordMap.containsKey(fieldName)) {
-                String className = fieldClassNames.get(fieldName);
-                String oldValue = recordMap.get(fieldName);
-                String newValue = oldValue;
+            Object objectValue = jsonMap.get(fieldName);
+            if (objectValue == null) {
+                continue;
+            }
 
-                if (newValue == null) {
-                    continue;
+            String className = fieldClassNames.get(fieldName);
+            String oldValue = objectValue.toString();
+            String newValue = oldValue;
+
+            // pay attention to the temporal types
+            // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
+            if ("bytes".equals(mySqlType) && className == null) {
+                // MySQL binary, varbinary, blob
+                newValue = new String(Base64.getDecoder().decode(oldValue));
+            } else if ("bytes".equals(mySqlType)
+                    && "org.apache.kafka.connect.data.Decimal".equals(className)) {
+                // MySQL numeric, fixed, decimal
+                try {
+                    new BigDecimal(oldValue);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException(
+                            "Invalid big decimal value "
+                                    + oldValue
+                                    + ". Make sure that in the `customConverterConfigs` "
+                                    + "of the JsonDebeziumDeserializationSchema you created, set '"
+                                    + JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+                                    + "' to 'numeric'",
+                            e);
                 }
-
-                // pay attention to the temporal types
-                // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
-                if ("bytes".equals(mySqlType) && className == null) {
-                    // MySQL binary, varbinary, blob
-                    newValue = new String(Base64.getDecoder().decode(oldValue));
-                } else if ("bytes".equals(mySqlType)
-                        && "org.apache.kafka.connect.data.Decimal".equals(className)) {
-                    // MySQL numeric, fixed, decimal
-                    try {
-                        new BigDecimal(oldValue);
-                    } catch (NumberFormatException e) {
-                        throw new IllegalArgumentException(
-                                "Invalid big decimal value "
-                                        + oldValue
-                                        + ". Make sure that in the `customConverterConfigs` "
-                                        + "of the JsonDebeziumDeserializationSchema you created, set '"
-                                        + JsonConverterConfig.DECIMAL_FORMAT_CONFIG
-                                        + "' to 'numeric'",
-                                e);
+            } else if ("io.debezium.time.Date".equals(className)) {
+                // MySQL date
+                newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
+            } else if ("io.debezium.time.Timestamp".equals(className)) {
+                // MySQL datetime (precision 0-3)
+
+                // display value of datetime is not affected by timezone, see
+                // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+                // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+                // for implementation
+                LocalDateTime localDateTime =
+                        Instant.ofEpochMilli(Long.parseLong(oldValue))
+                                .atZone(ZoneOffset.UTC)
+                                .toLocalDateTime();
+                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
+            } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
+                // MySQL datetime (precision 4-6)
+                long microseconds = Long.parseLong(oldValue);
+                long microsecondsPerSecond = 1_000_000;
+                long nanosecondsPerMicros = 1_000;
+                long seconds = microseconds / microsecondsPerSecond;
+                long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
+
+                // display value of datetime is not affected by timezone, see
+                // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+                // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+                // for implementation
+                LocalDateTime localDateTime =
+                        Instant.ofEpochSecond(seconds, nanoAdjustment)
+                                .atZone(ZoneOffset.UTC)
+                                .toLocalDateTime();
+                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
+            } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
+                // MySQL timestamp
+
+                // dispaly value of timestamp is affected by timezone, see
+                // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
+                // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
+                // for implementation
+                LocalDateTime localDateTime =
+                        Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
+                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
+            } else if ("io.debezium.time.MicroTime".equals(className)) {
+                long microseconds = Long.parseLong(oldValue);
+                long microsecondsPerSecond = 1_000_000;
+                long nanosecondsPerMicros = 1_000;
+                long seconds = microseconds / microsecondsPerSecond;
+                long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
+
+                newValue =
+                        Instant.ofEpochSecond(seconds, nanoAdjustment)
+                                .atZone(ZoneOffset.UTC)
+                                .toLocalTime()
+                                .toString();
+            } else if ("io.debezium.data.geometry.Point".equals(className)
+                    || "io.debezium.data.geometry.Geometry".equals(className)) {
+                JsonNode jsonNode = recordRow.get(fieldName);
+                try {
+                    byte[] wkb = jsonNode.get("wkb").binaryValue();
+                    String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
+                    JsonNode originGeoNode = objectMapper.readTree(geoJson);
+
+                    Optional<Integer> srid =
+                            Optional.ofNullable(
+                                    originGeoNode.has("srid")
+                                            ? originGeoNode.get("srid").intValue()
+                                            : null);
+                    Map<String, Object> geometryInfo = new HashMap<>();
+                    String geometryType = originGeoNode.get("type").asText();
+                    geometryInfo.put("type", geometryType);
+                    if (geometryType.equalsIgnoreCase("GeometryCollection")) {
+                        geometryInfo.put("geometries", originGeoNode.get("geometries"));
+                    } else {
+                        geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
                     }
-                } else if ("io.debezium.time.Date".equals(className)) {
-                    // MySQL date
-                    newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
-                } else if ("io.debezium.time.Timestamp".equals(className)) {
-                    // MySQL datetime (precision 0-3)
-
-                    // display value of datetime is not affected by timezone, see
-                    // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
-                    // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
-                    // for implementation
-                    LocalDateTime localDateTime =
-                            Instant.ofEpochMilli(Long.parseLong(oldValue))
-                                    .atZone(ZoneOffset.UTC)
-                                    .toLocalDateTime();
-                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
-                } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
-                    // MySQL datetime (precision 4-6)
-                    long microseconds = Long.parseLong(oldValue);
-                    long microsecondsPerSecond = 1_000_000;
-                    long nanosecondsPerMicros = 1_000;
-                    long seconds = microseconds / microsecondsPerSecond;
-                    long nanoAdjustment =
-                            (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
-
-                    // display value of datetime is not affected by timezone, see
-                    // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
-                    // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
-                    // for implementation
-                    LocalDateTime localDateTime =
-                            Instant.ofEpochSecond(seconds, nanoAdjustment)
-                                    .atZone(ZoneOffset.UTC)
-                                    .toLocalDateTime();
-                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
-                } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
-                    // MySQL timestamp
-
-                    // dispaly value of timestamp is affected by timezone, see
-                    // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
-                    // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
-                    // for implementation
-                    LocalDateTime localDateTime =
-                            Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
-                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
-                } else if ("io.debezium.time.MicroTime".equals(className)) {
-                    long microseconds = Long.parseLong(oldValue);
-                    long microsecondsPerSecond = 1_000_000;
-                    long nanosecondsPerMicros = 1_000;
-                    long seconds = microseconds / microsecondsPerSecond;
-                    long nanoAdjustment =
-                            (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
-
-                    newValue =
-                            Instant.ofEpochSecond(seconds, nanoAdjustment)
-                                    .atZone(ZoneOffset.UTC)
-                                    .toLocalTime()
-                                    .toString();
+                    geometryInfo.put("srid", srid.orElse(0));
+                    ObjectWriter objectWriter = objectMapper.writer();
+                    newValue = objectWriter.writeValueAsString(geometryInfo);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(
+                            String.format("Failed to convert %s to geometry JSON.", jsonNode), e);
                 }
-
-                recordMap.put(fieldName, newValue);
             }
+
+            resultMap.put(fieldName, newValue);
         }
 
-        return recordMap;
+        return resultMap;
     }
 
     private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 1a014aeb7..48869c24a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -98,6 +98,13 @@ public class MySqlTypeUtils {
     private static final String SET = "SET";
     private static final String ENUM = "ENUM";
     private static final String GEOMETRY = "GEOMETRY";
+    private static final String POINT = "POINT";
+    private static final String LINESTRING = "LINESTRING";
+    private static final String POLYGON = "POLYGON";
+    private static final String MULTIPOINT = "MULTIPOINT";
+    private static final String MULTILINESTRING = "MULTILINESTRING";
+    private static final String MULTIPOLYGON = "MULTIPOLYGON";
+    private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION";
     private static final String UNKNOWN = "UNKNOWN";
 
     // This length is from JDBC.
@@ -201,6 +208,14 @@ public class MySqlTypeUtils {
             case LONGTEXT:
             case JSON:
             case ENUM:
+            case GEOMETRY:
+            case POINT:
+            case LINESTRING:
+            case POLYGON:
+            case MULTIPOINT:
+            case MULTILINESTRING:
+            case MULTIPOLYGON:
+            case GEOMETRYCOLLECTION:
                 return DataTypes.STRING();
             case BINARY:
                 return DataTypes.BINARY(Preconditions.checkNotNull(length));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 701390950..d3e950d58 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -464,7 +464,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                             DataTypes.STRING(), // _json
                             DataTypes.STRING(), // _enum
                             DataTypes.INT(), // _year
-                            DataTypes.TIME() // _time
+                            DataTypes.TIME(), // _time
+                            DataTypes.STRING(), // _point
+                            DataTypes.STRING(), // _geometry
+                            DataTypes.STRING(), // _linestring
+                            DataTypes.STRING(), // _polygon
+                            DataTypes.STRING(), // _multipoint
+                            DataTypes.STRING(), // _multiline
+                            DataTypes.STRING(), // _multipolygon
+                            DataTypes.STRING() // _geometrycollection
                         },
                         new String[] {
                             "_id",
@@ -532,7 +540,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                             "_json",
                             "_enum",
                             "_year",
-                            "_time"
+                            "_time",
+                            "_point",
+                            "_geometry",
+                            "_linestring",
+                            "_polygon",
+                            "_multipoint",
+                            "_multiline",
+                            "_multipolygon",
+                            "_geometrycollection",
                         });
         FileStoreTable table = getFileStoreTable();
         List<String> expected =
@@ -570,7 +586,15 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                                 + "{\"a\": \"b\"}, "
                                 + "value1, "
                                 + "2023, "
-                                + "36803000"
+                                + "36803000, "
+                                + "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, "
+                                + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, "
+                                + "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, "
+                                + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, "
+                                + "{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
+                                + "{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, "
+                                + "{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, "
+                                + "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"
                                 + "]",
                         "+I["
                                 + "2, 2.2, "
@@ -595,6 +619,14 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                                 + "NULL, "
                                 + "NULL, "
                                 + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
                                 + "NULL"
                                 + "]");
         waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index d18ccef31..580e13e3f 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -137,6 +137,14 @@ CREATE TABLE all_types_table (
     -- YEAR
     _year YEAR,
     _time TIME,
+    _point POINT,
+    _geometry GEOMETRY,
+    _linestring LINESTRING,
+    _polygon  POLYGON,
+    _multipoint  MULTIPOINT,
+    _multiline  MULTILINESTRING,
+    _multipolygon  MULTIPOLYGON,
+    _geometrycollection GEOMETRYCOLLECTION,
     PRIMARY KEY (_id)
 );
 
@@ -186,7 +194,15 @@ INSERT INTO all_types_table VALUES (
      -- YEAR
      2023,
      -- TIME,
-     '10:13:23'
+     '10:13:23',
+    ST_GeomFromText('POINT(1 1)'),
+    ST_GeomFromText('POLYGON((1 1, 2 1, 2 2,  1 2, 1 1))'),
+    ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),
+    ST_GeomFromText('POLYGON((1 1, 2 1, 2 2,  1 2, 1 1))'),
+    ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
+    ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
+    ST_GeomFromText('MULTIPOLYGON(((0 0,10 0,10 10,0 10,0 0)),((5 5,7 5,7 7,5 7, 5 5)))'),
+    ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')
 ), (
     2, 2.2,
     NULL, NULL, NULL, NULL, NULL, NULL,
@@ -210,6 +226,14 @@ INSERT INTO all_types_table VALUES (
     NULL,
     NULL,
     NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
     NULL
 );