You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by "zhongqishang (via GitHub)" <gi...@apache.org> on 2024/03/12 11:11:05 UTC

[PR] [hotfix][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

zhongqishang opened a new pull request, #3130:
URL: https://github.com/apache/flink-cdc/pull/3130

   Fix cdc `RowType` can not convert to flink type
   
   I meet the follow exception:
   ```
   java.lang.ArrayStoreException
   	at java.lang.System.arraycopy(Native Method)
   	at java.util.Arrays.copyOf(Arrays.java:3213)
   	at java.util.ArrayList.toArray(ArrayList.java:413)
   	at java.util.Collections$UnmodifiableCollection.toArray(Collections.java:1036)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544195238


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   Get it, Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544207095


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java:
##########
@@ -27,20 +27,21 @@
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;

Review Comment:
   Rollback.



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java:
##########
@@ -27,20 +27,21 @@
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;

Review Comment:
   Rollbacked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544092364


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   I get the error `Test class 'DataTypeUtilsTest' is not constructable because it is not 'public'`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544093001


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private static final DataType[] allTypes =
+            new DataType[] {
+                DataTypes.BOOLEAN(),
+                DataTypes.BYTES(),
+                DataTypes.BINARY(10),
+                DataTypes.VARBINARY(10),
+                DataTypes.CHAR(10),
+                DataTypes.VARCHAR(10),
+                DataTypes.STRING(),
+                DataTypes.INT(),
+                DataTypes.TINYINT(),
+                DataTypes.SMALLINT(),
+                DataTypes.BIGINT(),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT(),
+                DataTypes.DECIMAL(6, 3),
+                DataTypes.DATE(),
+                DataTypes.TIME(),
+                DataTypes.TIME(6),
+                DataTypes.TIMESTAMP(),
+                DataTypes.TIMESTAMP(6),
+                DataTypes.TIMESTAMP_LTZ(),
+                DataTypes.TIMESTAMP_LTZ(6),
+                DataTypes.TIMESTAMP_TZ(),
+                DataTypes.TIMESTAMP_TZ(6),
+                DataTypes.ARRAY(DataTypes.BIGINT()),
+                DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.STRING()),
+                        DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
+                DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
+            };
+
+    @Test
+    public void testToFlinkDataType() {

Review Comment:
   I get the error `Method 'testToFlinkDataType' annotated with '@Test' should be public `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544183691


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   Sorry, I marked it on the wrong class.
   It should be `DataTypeUtilsTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544209670


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -134,8 +136,27 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(
+                                        dataField ->
+                                                dataField.getDescription() == null
+                                                        ? org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()))
+                                                        : org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()),
+                                                                        dataField.getDescription()))
+                                .collect(Collectors.toList());

Review Comment:
   Yes, this will be better, Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544092364


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   I get the error `Test class 'DataTypeUtilsTest' is not constructable because it is not 'public'`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "laglangyue (via GitHub)" <gi...@apache.org>.
laglangyue commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544139138


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -134,8 +136,27 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(
+                                        dataField ->
+                                                dataField.getDescription() == null
+                                                        ? org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()))
+                                                        : org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()),
+                                                                        dataField.getDescription()))
+                                .collect(Collectors.toList());

Review Comment:
   the lambda is so ugly, maybe you can define a map function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544202638


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java:
##########
@@ -27,20 +27,21 @@
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;

Review Comment:
   OK, I will be separate a new PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "laglangyue (via GitHub)" <gi...@apache.org>.
laglangyue commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544193093


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -134,8 +136,27 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(
+                                        dataField ->
+                                                dataField.getDescription() == null
+                                                        ? org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()))
+                                                        : org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()),
+                                                                        dataField.getDescription()))
+                                .collect(Collectors.toList());

Review Comment:
   In my opinion, `toFlinkDataTypeField` is a method of DataField, not a static function in utils.
   Hope for a idea from Project member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [hotfix][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "PatrickRen (via GitHub)" <gi...@apache.org>.
PatrickRen commented on PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#issuecomment-2022134689

   @zhongqishang Thanks for the PR! Could you open a Jira issue for this one? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544155132


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -134,8 +136,27 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(
+                                        dataField ->
+                                                dataField.getDescription() == null
+                                                        ? org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()))
+                                                        : org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()),
+                                                                        dataField.getDescription()))
+                                .collect(Collectors.toList());

Review Comment:
   Add a named `toFlinkDataTypeField` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544195401


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private static final DataType[] allTypes =
+            new DataType[] {
+                DataTypes.BOOLEAN(),
+                DataTypes.BYTES(),
+                DataTypes.BINARY(10),
+                DataTypes.VARBINARY(10),
+                DataTypes.CHAR(10),
+                DataTypes.VARCHAR(10),
+                DataTypes.STRING(),
+                DataTypes.INT(),
+                DataTypes.TINYINT(),
+                DataTypes.SMALLINT(),
+                DataTypes.BIGINT(),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT(),
+                DataTypes.DECIMAL(6, 3),
+                DataTypes.DATE(),
+                DataTypes.TIME(),
+                DataTypes.TIME(6),
+                DataTypes.TIMESTAMP(),
+                DataTypes.TIMESTAMP(6),
+                DataTypes.TIMESTAMP_LTZ(),
+                DataTypes.TIMESTAMP_LTZ(6),
+                DataTypes.TIMESTAMP_TZ(),
+                DataTypes.TIMESTAMP_TZ(6),
+                DataTypes.ARRAY(DataTypes.BIGINT()),
+                DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.STRING()),
+                        DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
+                DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
+            };
+
+    @Test
+    public void testToFlinkDataType() {

Review Comment:
   Get it, Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544113238


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   Changing the scope will be compile failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544023697


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -26,10 +26,12 @@
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {

Review Comment:
   ```suggestion 
   class DataTypeUtils {
   ```



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private static final DataType[] allTypes =
+            new DataType[] {
+                DataTypes.BOOLEAN(),
+                DataTypes.BYTES(),
+                DataTypes.BINARY(10),
+                DataTypes.VARBINARY(10),
+                DataTypes.CHAR(10),
+                DataTypes.VARCHAR(10),
+                DataTypes.STRING(),
+                DataTypes.INT(),
+                DataTypes.TINYINT(),
+                DataTypes.SMALLINT(),
+                DataTypes.BIGINT(),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT(),
+                DataTypes.DECIMAL(6, 3),
+                DataTypes.DATE(),
+                DataTypes.TIME(),
+                DataTypes.TIME(6),
+                DataTypes.TIMESTAMP(),
+                DataTypes.TIMESTAMP(6),
+                DataTypes.TIMESTAMP_LTZ(),
+                DataTypes.TIMESTAMP_LTZ(6),
+                DataTypes.TIMESTAMP_TZ(),
+                DataTypes.TIMESTAMP_TZ(6),
+                DataTypes.ARRAY(DataTypes.BIGINT()),
+                DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.STRING()),
+                        DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
+                DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
+            };
+
+    @Test
+    public void testToFlinkDataType() {

Review Comment:
   ```suggestion
       void testToFlinkDataType() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun merged PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544093113


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {

Review Comment:
   Done.



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private final DataType[] allTypes =

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544185141


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private static final DataType[] allTypes =
+            new DataType[] {
+                DataTypes.BOOLEAN(),
+                DataTypes.BYTES(),
+                DataTypes.BINARY(10),
+                DataTypes.VARBINARY(10),
+                DataTypes.CHAR(10),
+                DataTypes.VARCHAR(10),
+                DataTypes.STRING(),
+                DataTypes.INT(),
+                DataTypes.TINYINT(),
+                DataTypes.SMALLINT(),
+                DataTypes.BIGINT(),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT(),
+                DataTypes.DECIMAL(6, 3),
+                DataTypes.DATE(),
+                DataTypes.TIME(),
+                DataTypes.TIME(6),
+                DataTypes.TIMESTAMP(),
+                DataTypes.TIMESTAMP(6),
+                DataTypes.TIMESTAMP_LTZ(),
+                DataTypes.TIMESTAMP_LTZ(6),
+                DataTypes.TIMESTAMP_TZ(),
+                DataTypes.TIMESTAMP_TZ(6),
+                DataTypes.ARRAY(DataTypes.BIGINT()),
+                DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.STRING()),
+                        DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
+                DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
+            };
+
+    @Test
+    public void testToFlinkDataType() {

Review Comment:
   In JUnit5, test  class and methods do not need to be declared as public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1543970905


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private final DataType[] allTypes =

Review Comment:
   nit: can be const.



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {

Review Comment:
   Thanks @zhongqishang for this hotfix. 
   Looks overall good to me.
   Could you help change the test from JUnit4 to JUnit5 and minimize the visibility for class and methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "laglangyue (via GitHub)" <gi...@apache.org>.
laglangyue commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544193093


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -134,8 +136,27 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(
+                                        dataField ->
+                                                dataField.getDescription() == null
+                                                        ? org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()))
+                                                        : org.apache.flink.table.api.DataTypes
+                                                                .FIELD(
+                                                                        dataField.getName(),
+                                                                        toFlinkDataType(
+                                                                                dataField
+                                                                                        .getType()),
+                                                                        dataField.getDescription()))
+                                .collect(Collectors.toList());

Review Comment:
   In my opinion, `toFlinkDataTypeField` is a method of DataField, not a static function in utils.
   Hope for a review from Project member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "zhongqishang (via GitHub)" <gi...@apache.org>.
zhongqishang commented on PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#issuecomment-2026721760

   @Jiabao-Sun Thanks for your detailed review, all fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "laglangyue (via GitHub)" <gi...@apache.org>.
laglangyue commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544200476


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java:
##########
@@ -27,20 +27,21 @@
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;

Review Comment:
   It change SchemaUtilsTest IT to Junit5, it seems not related to this PR. And we separate a PR to do it in usually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-34948][cdc-common] Fix cdc `RowType` can not convert to flink type [flink-cdc]

Posted by "Jiabao-Sun (via GitHub)" <gi...@apache.org>.
Jiabao-Sun commented on code in PR #3130:
URL: https://github.com/apache/flink-cdc/pull/3130#discussion_r1544023070


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+public class DataTypeUtilsTest {
+    private static final DataType[] allTypes =

Review Comment:
   ```suggestion
       private static final DataType[] ALL_TYPES =
   ```



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;

Review Comment:
   ```suggestion
   import static org.assertj.core.api.Assertions.assertThat;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org