You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jane Chan (Jira)" <ji...@apache.org> on 2023/04/23 07:04:00 UTC
[jira] [Updated] (FLINK-31884) Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip
[ https://issues.apache.org/jira/browse/FLINK-31884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jane Chan updated FLINK-31884:
------------------------------
Description:
h4. How to Reproduce
Firstly, add a test to dump the compiled plan JSON.
{code:java}
@Test
public void debug() {
tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')");
tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')");
tableEnv.compilePlanSql("insert into bar select * from foo")
.writeToFile(new File("/path/to/debug.json"));
}
{code}
The JSON context is as follows
{code:json}
{
"flinkVersion" : "1.18",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`foo`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "f0",
"dataType" : "INT"
}, {
"name" : "f1",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"connector" : "datagen"
}
}
}
},
"outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
"inputProperties" : [ ]
}, {
"id" : 2,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`bar`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "f0",
"dataType" : "INT"
}, {
"name" : "f1",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"connector" : "print"
}
}
}
},
"inputChangelogMode" : [ "INSERT" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
} ],
"edges" : [ {
"source" : 1,
"target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}
{code}
Then upgrade the StreamExecSink to a new version
{code:java}
@ExecNodeMetadata(
name = "stream-exec-sink",
version = 1,
consumedOptions = {
"table.exec.sink.not-null-enforcer",
"table.exec.sink.type-length-enforcer",
"table.exec.sink.upsert-materialize",
"table.exec.sink.keyed-shuffle"
},
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
},
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
@ExecNodeMetadata(
name = "stream-exec-sink",
version = 2,
consumedOptions = {
"table.exec.sink.not-null-enforcer",
"table.exec.sink.type-length-enforcer",
"table.exec.sink.upsert-materialize",
"table.exec.sink.keyed-shuffle"
},
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
},
minPlanVersion = FlinkVersion.v1_18,
minStateVersion = FlinkVersion.v1_15)
public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
}
{code}
And then load the previous plan and print it as JSON text
{code:java}
tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
{code}
The SerDe lost idempotence since the version for StreamExecSink became version 2.
{code:json}
{
"flinkVersion" : "1.18",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`foo`"
}
},
"outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
"inputProperties" : [ ]
}, {
"id" : 2,
"type" : "stream-exec-sink_2",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`bar`"
}
},
"inputChangelogMode" : [ "INSERT" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
} ],
"edges" : [ {
"source" : 1,
"target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}
{code}
h4. Root Cause
ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
h4. Suggested Fix
If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
was:
h4. How to Reproduce
h4. Root Cause
h4. Suggested Fix
> Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-31884
> URL: https://issues.apache.org/jira/browse/FLINK-31884
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: Jane Chan
> Priority: Major
> Fix For: 1.18.0
>
>
> h4. How to Reproduce
> Firstly, add a test to dump the compiled plan JSON.
> {code:java}
> @Test
> public void debug() {
> tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')");
> tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')");
> tableEnv.compilePlanSql("insert into bar select * from foo")
> .writeToFile(new File("/path/to/debug.json"));
> }
> {code}
> The JSON context is as follows
> {code:json}
> {
> "flinkVersion" : "1.18",
> "nodes" : [ {
> "id" : 1,
> "type" : "stream-exec-table-source-scan_1",
> "scanTableSource" : {
> "table" : {
> "identifier" : "`default_catalog`.`default_database`.`foo`",
> "resolvedTable" : {
> "schema" : {
> "columns" : [ {
> "name" : "f0",
> "dataType" : "INT"
> }, {
> "name" : "f1",
> "dataType" : "VARCHAR(2147483647)"
> } ],
> "watermarkSpecs" : [ ]
> },
> "partitionKeys" : [ ],
> "options" : {
> "connector" : "datagen"
> }
> }
> }
> },
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
> "inputProperties" : [ ]
> }, {
> "id" : 2,
> "type" : "stream-exec-sink_1",
> "configuration" : {
> "table.exec.sink.keyed-shuffle" : "AUTO",
> "table.exec.sink.not-null-enforcer" : "ERROR",
> "table.exec.sink.type-length-enforcer" : "IGNORE",
> "table.exec.sink.upsert-materialize" : "AUTO"
> },
> "dynamicTableSink" : {
> "table" : {
> "identifier" : "`default_catalog`.`default_database`.`bar`",
> "resolvedTable" : {
> "schema" : {
> "columns" : [ {
> "name" : "f0",
> "dataType" : "INT"
> }, {
> "name" : "f1",
> "dataType" : "VARCHAR(2147483647)"
> } ],
> "watermarkSpecs" : [ ]
> },
> "partitionKeys" : [ ],
> "options" : {
> "connector" : "print"
> }
> }
> }
> },
> "inputChangelogMode" : [ "INSERT" ],
> "inputProperties" : [ {
> "requiredDistribution" : {
> "type" : "UNKNOWN"
> },
> "damBehavior" : "PIPELINED",
> "priority" : 0
> } ],
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
> } ],
> "edges" : [ {
> "source" : 1,
> "target" : 2,
> "shuffle" : {
> "type" : "FORWARD"
> },
> "shuffleMode" : "PIPELINED"
> } ]
> }
> {code}
> Then upgrade the StreamExecSink to a new version
> {code:java}
> @ExecNodeMetadata(
> name = "stream-exec-sink",
> version = 1,
> consumedOptions = {
> "table.exec.sink.not-null-enforcer",
> "table.exec.sink.type-length-enforcer",
> "table.exec.sink.upsert-materialize",
> "table.exec.sink.keyed-shuffle"
> },
> producedTransformations = {
> CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
> CommonExecSink.PARTITIONER_TRANSFORMATION,
> CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
> CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
> CommonExecSink.SINK_TRANSFORMATION
> },
> minPlanVersion = FlinkVersion.v1_15,
> minStateVersion = FlinkVersion.v1_15)
> @ExecNodeMetadata(
> name = "stream-exec-sink",
> version = 2,
> consumedOptions = {
> "table.exec.sink.not-null-enforcer",
> "table.exec.sink.type-length-enforcer",
> "table.exec.sink.upsert-materialize",
> "table.exec.sink.keyed-shuffle"
> },
> producedTransformations = {
> CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
> CommonExecSink.PARTITIONER_TRANSFORMATION,
> CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
> CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
> CommonExecSink.SINK_TRANSFORMATION
> },
> minPlanVersion = FlinkVersion.v1_18,
> minStateVersion = FlinkVersion.v1_15)
> public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
> }
> {code}
> And then load the previous plan and print it as JSON text
> {code:java}
> tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
> {code}
> The SerDe lost idempotence since the version for StreamExecSink became version 2.
> {code:json}
> {
> "flinkVersion" : "1.18",
> "nodes" : [ {
> "id" : 1,
> "type" : "stream-exec-table-source-scan_1",
> "scanTableSource" : {
> "table" : {
> "identifier" : "`default_catalog`.`default_database`.`foo`"
> }
> },
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
> "inputProperties" : [ ]
> }, {
> "id" : 2,
> "type" : "stream-exec-sink_2",
> "configuration" : {
> "table.exec.sink.keyed-shuffle" : "AUTO",
> "table.exec.sink.not-null-enforcer" : "ERROR",
> "table.exec.sink.type-length-enforcer" : "IGNORE",
> "table.exec.sink.upsert-materialize" : "AUTO"
> },
> "dynamicTableSink" : {
> "table" : {
> "identifier" : "`default_catalog`.`default_database`.`bar`"
> }
> },
> "inputChangelogMode" : [ "INSERT" ],
> "inputProperties" : [ {
> "requiredDistribution" : {
> "type" : "UNKNOWN"
> },
> "damBehavior" : "PIPELINED",
> "priority" : 0
> } ],
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
> } ],
> "edges" : [ {
> "source" : 1,
> "target" : 2,
> "shuffle" : {
> "type" : "FORWARD"
> },
> "shuffleMode" : "PIPELINED"
> } ]
> }
> {code}
> h4. Root Cause
> ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
>
> h4. Suggested Fix
> If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)