You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jane Chan (Jira)" <ji...@apache.org> on 2023/04/23 07:04:00 UTC

[jira] [Updated] (FLINK-31884) Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip

     [ https://issues.apache.org/jira/browse/FLINK-31884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jane Chan updated FLINK-31884:
------------------------------
    Description: 
h4. How to Reproduce

Firstly, add a test to dump the compiled plan JSON.
{code:java}
@Test
public void debug() {
    tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')");
    tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')");
    tableEnv.compilePlanSql("insert into bar select * from foo")
            .writeToFile(new File("/path/to/debug.json"));
}
{code}
The JSON context is as follows
{code:json}
{
  "flinkVersion" : "1.18",
  "nodes" : [ {
    "id" : 1,
    "type" : "stream-exec-table-source-scan_1",
    "scanTableSource" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`foo`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "f0",
              "dataType" : "INT"
            }, {
              "name" : "f1",
              "dataType" : "VARCHAR(2147483647)"
            } ],
            "watermarkSpecs" : [ ]
          },
          "partitionKeys" : [ ],
          "options" : {
            "connector" : "datagen"
          }
        }
      }
    },
    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
    "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
    "inputProperties" : [ ]
  }, {
    "id" : 2,
    "type" : "stream-exec-sink_1",
    "configuration" : {
      "table.exec.sink.keyed-shuffle" : "AUTO",
      "table.exec.sink.not-null-enforcer" : "ERROR",
      "table.exec.sink.type-length-enforcer" : "IGNORE",
      "table.exec.sink.upsert-materialize" : "AUTO"
    },
    "dynamicTableSink" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`bar`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "f0",
              "dataType" : "INT"
            }, {
              "name" : "f1",
              "dataType" : "VARCHAR(2147483647)"
            } ],
            "watermarkSpecs" : [ ]
          },
          "partitionKeys" : [ ],
          "options" : {
            "connector" : "print"
          }
        }
      }
    },
    "inputChangelogMode" : [ "INSERT" ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
    "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
  } ],
  "edges" : [ {
    "source" : 1,
    "target" : 2,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  } ]
}
{code}
Then upgrade the StreamExecSink to a new version
{code:java}
@ExecNodeMetadata(
        name = "stream-exec-sink",
        version = 1,
        consumedOptions = {
            "table.exec.sink.not-null-enforcer",
            "table.exec.sink.type-length-enforcer",
            "table.exec.sink.upsert-materialize",
            "table.exec.sink.keyed-shuffle"
        },
        producedTransformations = {
            CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
            CommonExecSink.PARTITIONER_TRANSFORMATION,
            CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
            CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
            CommonExecSink.SINK_TRANSFORMATION
        },
        minPlanVersion = FlinkVersion.v1_15,
        minStateVersion = FlinkVersion.v1_15)
@ExecNodeMetadata(
        name = "stream-exec-sink",
        version = 2,
        consumedOptions = {
            "table.exec.sink.not-null-enforcer",
            "table.exec.sink.type-length-enforcer",
            "table.exec.sink.upsert-materialize",
            "table.exec.sink.keyed-shuffle"
        },
        producedTransformations = {
            CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
            CommonExecSink.PARTITIONER_TRANSFORMATION,
            CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
            CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
            CommonExecSink.SINK_TRANSFORMATION
        },
        minPlanVersion = FlinkVersion.v1_18,
        minStateVersion = FlinkVersion.v1_15)
public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
}
{code}
And then load the previous plan and print it as JSON text
{code:java}
tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
{code}
The SerDe lost idempotence since the version for StreamExecSink became version 2.
{code:json}
{
  "flinkVersion" : "1.18",
  "nodes" : [ {
    "id" : 1,
    "type" : "stream-exec-table-source-scan_1",
    "scanTableSource" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`foo`"
      }
    },
    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
    "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
    "inputProperties" : [ ]
  }, {
    "id" : 2,
    "type" : "stream-exec-sink_2",
    "configuration" : {
      "table.exec.sink.keyed-shuffle" : "AUTO",
      "table.exec.sink.not-null-enforcer" : "ERROR",
      "table.exec.sink.type-length-enforcer" : "IGNORE",
      "table.exec.sink.upsert-materialize" : "AUTO"
    },
    "dynamicTableSink" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`bar`"
      }
    },
    "inputChangelogMode" : [ "INSERT" ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
    "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
  } ],
  "edges" : [ {
    "source" : 1,
    "target" : 2,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  } ]
}
{code}
h4. Root Cause

ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
 
h4. Suggested Fix

If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
 

  was:
h4. How to Reproduce
h4. Root Cause

 
h4. Suggested Fix

 


> Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31884
>                 URL: https://issues.apache.org/jira/browse/FLINK-31884
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Jane Chan
>            Priority: Major
>             Fix For: 1.18.0
>
>
> h4. How to Reproduce
> Firstly, add a test to dump the compiled plan JSON.
> {code:java}
> @Test
> public void debug() {
>     tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')");
>     tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')");
>     tableEnv.compilePlanSql("insert into bar select * from foo")
>             .writeToFile(new File("/path/to/debug.json"));
> }
> {code}
> The JSON context is as follows
> {code:json}
> {
>   "flinkVersion" : "1.18",
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "stream-exec-table-source-scan_1",
>     "scanTableSource" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`foo`",
>         "resolvedTable" : {
>           "schema" : {
>             "columns" : [ {
>               "name" : "f0",
>               "dataType" : "INT"
>             }, {
>               "name" : "f1",
>               "dataType" : "VARCHAR(2147483647)"
>             } ],
>             "watermarkSpecs" : [ ]
>           },
>           "partitionKeys" : [ ],
>           "options" : {
>             "connector" : "datagen"
>           }
>         }
>       }
>     },
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
>     "inputProperties" : [ ]
>   }, {
>     "id" : 2,
>     "type" : "stream-exec-sink_1",
>     "configuration" : {
>       "table.exec.sink.keyed-shuffle" : "AUTO",
>       "table.exec.sink.not-null-enforcer" : "ERROR",
>       "table.exec.sink.type-length-enforcer" : "IGNORE",
>       "table.exec.sink.upsert-materialize" : "AUTO"
>     },
>     "dynamicTableSink" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`bar`",
>         "resolvedTable" : {
>           "schema" : {
>             "columns" : [ {
>               "name" : "f0",
>               "dataType" : "INT"
>             }, {
>               "name" : "f1",
>               "dataType" : "VARCHAR(2147483647)"
>             } ],
>             "watermarkSpecs" : [ ]
>           },
>           "partitionKeys" : [ ],
>           "options" : {
>             "connector" : "print"
>           }
>         }
>       }
>     },
>     "inputChangelogMode" : [ "INSERT" ],
>     "inputProperties" : [ {
>       "requiredDistribution" : {
>         "type" : "UNKNOWN"
>       },
>       "damBehavior" : "PIPELINED",
>       "priority" : 0
>     } ],
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
>   } ],
>   "edges" : [ {
>     "source" : 1,
>     "target" : 2,
>     "shuffle" : {
>       "type" : "FORWARD"
>     },
>     "shuffleMode" : "PIPELINED"
>   } ]
> }
> {code}
> Then upgrade the StreamExecSink to a new version
> {code:java}
> @ExecNodeMetadata(
>         name = "stream-exec-sink",
>         version = 1,
>         consumedOptions = {
>             "table.exec.sink.not-null-enforcer",
>             "table.exec.sink.type-length-enforcer",
>             "table.exec.sink.upsert-materialize",
>             "table.exec.sink.keyed-shuffle"
>         },
>         producedTransformations = {
>             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
>             CommonExecSink.PARTITIONER_TRANSFORMATION,
>             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
>             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
>             CommonExecSink.SINK_TRANSFORMATION
>         },
>         minPlanVersion = FlinkVersion.v1_15,
>         minStateVersion = FlinkVersion.v1_15)
> @ExecNodeMetadata(
>         name = "stream-exec-sink",
>         version = 2,
>         consumedOptions = {
>             "table.exec.sink.not-null-enforcer",
>             "table.exec.sink.type-length-enforcer",
>             "table.exec.sink.upsert-materialize",
>             "table.exec.sink.keyed-shuffle"
>         },
>         producedTransformations = {
>             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
>             CommonExecSink.PARTITIONER_TRANSFORMATION,
>             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
>             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
>             CommonExecSink.SINK_TRANSFORMATION
>         },
>         minPlanVersion = FlinkVersion.v1_18,
>         minStateVersion = FlinkVersion.v1_15)
> public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
> }
> {code}
> And then load the previous plan and print it as JSON text
> {code:java}
> tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
> {code}
> The SerDe lost idempotence since the version for StreamExecSink became version 2.
> {code:json}
> {
>   "flinkVersion" : "1.18",
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "stream-exec-table-source-scan_1",
>     "scanTableSource" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`foo`"
>       }
>     },
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
>     "inputProperties" : [ ]
>   }, {
>     "id" : 2,
>     "type" : "stream-exec-sink_2",
>     "configuration" : {
>       "table.exec.sink.keyed-shuffle" : "AUTO",
>       "table.exec.sink.not-null-enforcer" : "ERROR",
>       "table.exec.sink.type-length-enforcer" : "IGNORE",
>       "table.exec.sink.upsert-materialize" : "AUTO"
>     },
>     "dynamicTableSink" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`bar`"
>       }
>     },
>     "inputChangelogMode" : [ "INSERT" ],
>     "inputProperties" : [ {
>       "requiredDistribution" : {
>         "type" : "UNKNOWN"
>       },
>       "damBehavior" : "PIPELINED",
>       "priority" : 0
>     } ],
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
>   } ],
>   "edges" : [ {
>     "source" : 1,
>     "target" : 2,
>     "shuffle" : {
>       "type" : "FORWARD"
>     },
>     "shuffleMode" : "PIPELINED"
>   } ]
> }
> {code}
> h4. Root Cause
> ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
>  
> h4. Suggested Fix
> If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)