You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "tsreaper (via GitHub)" <gi...@apache.org> on 2023/03/22 08:00:35 UTC

[GitHub] [incubator-paimon] tsreaper opened a new pull request, #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

tsreaper opened a new pull request, #683:
URL: https://github.com/apache/incubator-paimon/pull/683

   ### Purpose
   
   It is common for database users to update column types to a wider type, such as changing `INT` types to `BIGINT`s, and `CHAR`s to `VARCHAR`s. We should support these common usages.
   
   ### Tests
   
   * SchemaAwareStoreWriteOperatorTest
   * FlinkCdcSinkITCase
   
   ### API and Format 
   
   N/A
   
   ### Documentation
   
   N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #683:
URL: https://github.com/apache/incubator-paimon/pull/683#discussion_r1144477732


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java:
##########
@@ -47,18 +49,52 @@ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple database tables, all
-            // these tables will be added the same column. However schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (Exception e) {
+                // This is normal. For example when a table is split into multiple database tables,
+                // all these tables will be added the same column. However schemaManager can't
+                // handle duplicated column adds, so we just catch the exception and log it.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to perform SchemaChange.AddColumn {}", schemaChange, e);
+                }
             }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+            SchemaChange.UpdateColumnType updateColumnType =
+                    (SchemaChange.UpdateColumnType) schemaChange;
+            TableSchema schema =
+                    schemaManager
+                            .latest()
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Table does not exist. This is unexpected."));
+            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
+            Preconditions.checkState(

Review Comment:
   Document this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] tsreaper merged pull request #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

Posted by "tsreaper (via GitHub)" <gi...@apache.org>.
tsreaper merged PR #683:
URL: https://github.com/apache/incubator-paimon/pull/683


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #683:
URL: https://github.com/apache/incubator-paimon/pull/683#discussion_r1144383177


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java:
##########
@@ -47,18 +49,52 @@ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple database tables, all
-            // these tables will be added the same column. However schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (Exception e) {

Review Comment:
   Can we just catch specific exception?
   I am concern that there is some exception from DFS.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java:
##########
@@ -47,18 +49,52 @@ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple database tables, all
-            // these tables will be added the same column. However schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (Exception e) {
+                // This is normal. For example when a table is split into multiple database tables,
+                // all these tables will be added the same column. However schemaManager can't
+                // handle duplicated column adds, so we just catch the exception and log it.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to perform SchemaChange.AddColumn {}", schemaChange, e);
+                }
             }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+            SchemaChange.UpdateColumnType updateColumnType =
+                    (SchemaChange.UpdateColumnType) schemaChange;
+            TableSchema schema =
+                    schemaManager
+                            .latest()
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Table does not exist. This is unexpected."));
+            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
+            Preconditions.checkState(

Review Comment:
   Do we need to have these validations?
   It looks like these should be done in `schemaManager.commitChanges`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] tsreaper commented on a diff in pull request #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

Posted by "tsreaper (via GitHub)" <gi...@apache.org>.
tsreaper commented on code in PR #683:
URL: https://github.com/apache/incubator-paimon/pull/683#discussion_r1144391145


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java:
##########
@@ -47,18 +49,52 @@ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple database tables, all
-            // these tables will be added the same column. However schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (Exception e) {
+                // This is normal. For example when a table is split into multiple database tables,
+                // all these tables will be added the same column. However schemaManager can't
+                // handle duplicated column adds, so we just catch the exception and log it.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to perform SchemaChange.AddColumn {}", schemaChange, e);
+                }
             }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+            SchemaChange.UpdateColumnType updateColumnType =
+                    (SchemaChange.UpdateColumnType) schemaChange;
+            TableSchema schema =
+                    schemaManager
+                            .latest()
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Table does not exist. This is unexpected."));
+            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
+            Preconditions.checkState(

Review Comment:
   It is OK for `schemaManager.commitChanges` to convert an `INT` column to `STRING`, but this is not OK for CDC sink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #683: [FLINK-31558] Support updating schema type with restrictions in CDC sink

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #683:
URL: https://github.com/apache/incubator-paimon/pull/683#discussion_r1144479954


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java:
##########
@@ -47,18 +49,52 @@ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple database tables, all
-            // these tables will be added the same column. However schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (Exception e) {
+                // This is normal. For example when a table is split into multiple database tables,
+                // all these tables will be added the same column. However schemaManager can't
+                // handle duplicated column adds, so we just catch the exception and log it.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to perform SchemaChange.AddColumn {}", schemaChange, e);
+                }
             }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+            SchemaChange.UpdateColumnType updateColumnType =
+                    (SchemaChange.UpdateColumnType) schemaChange;
+            TableSchema schema =
+                    schemaManager
+                            .latest()
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Table does not exist. This is unexpected."));
+            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
+            Preconditions.checkState(

Review Comment:
   It is better to add full checking to check long to int.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org