You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "RocMarshal (Jira)" <ji...@apache.org> on 2022/12/27 11:29:00 UTC
[jira] [Updated] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.
[ https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RocMarshal updated FLINK-30511:
-------------------------------
Description:
* Code segment:
{code:java}
public class OnTimerDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("taskmanager.numberOfTaskSlots", "4");
conf.setString("state.checkpoint-storage", "filesystem");
conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
conf.setString("execution.checkpointing.interval", "30s");
//conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
EnvironmentSettings envSetting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
String sourceDDL = "CREATE TABLE orders (\n" +
" id INT,\n" +
" app INT,\n" +
" user_id STRING" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.app.min'='1',\n" +
" 'fields.app.max'='10',\n" +
" 'fields.user_id.length'='10'\n" +
")";
tableEnv.executeSql(sourceDDL);
Table query = tableEnv.sqlQuery("select * from orders");
DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class);
TypeInformation<?>[] returnTypes = new TypeInformation[4];
returnTypes[0] = Types.INT;
returnTypes[1] = Types.INT; // Anchor-B:
returnTypes[2] = Types.INT;
returnTypes[3] = Types.INT;
rowDataStream.keyBy(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
return value.getFieldAs(2);
}
}).process(new KeyedProcessFunction<String, Row, Row>() {
private Row firstRow;
@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
if (firstRow == null) {
firstRow = value;
}
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
Row colRow = new Row(4);
colRow.setField(0, 0);
colRow.setField(1, 1);
colRow.setField(2, 2);
colRow.setField(3, 3);
out.collect(colRow); // Anchor-C
}
}).name("TargetTestUDF")
.returns(new RowTypeInfo(returnTypes))
.print();
env.execute(OnTimerDemo.class.getSimpleName());
}
}
{code}
* Recurrence steps
** Run the job without state.
** Collect the latest available checkpoint path as 'checkpoint-path-a'
** Stop the job.
** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line.
** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line.
** Then add break-point at 'StreamTask#handleAsyncException' method.
** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method.
** So, The framework can't catch the same exception in the case.
* Root cause:
** !截屏2022-12-27 18.51.12.png!
** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'.
!截屏2022-12-27 19.20.00.png!
Could it be seen as the framework's missing handling of exceptions?
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'.
Thank u.
was:
* Code segment:
{code:java}
public class OnTimerDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("taskmanager.numberOfTaskSlots", "4");
conf.setString("state.checkpoint-storage", "filesystem");
conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
conf.setString("execution.checkpointing.interval", "30s");
//conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
EnvironmentSettings envSetting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
String sourceDDL = "CREATE TABLE orders (\n" +
" id INT,\n" +
" app INT,\n" +
" user_id STRING" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.app.min'='1',\n" +
" 'fields.app.max'='10',\n" +
" 'fields.user_id.length'='10'\n" +
")";
tableEnv.executeSql(sourceDDL);
Table query = tableEnv.sqlQuery("select * from orders");
DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class);
TypeInformation<?>[] returnTypes = new TypeInformation[4];
returnTypes[0] = Types.INT;
returnTypes[1] = Types.INT; // Anchor-B:
returnTypes[2] = Types.INT;
returnTypes[3] = Types.INT;
rowDataStream.keyBy(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
return value.getFieldAs(2);
}
}).process(new KeyedProcessFunction<String, Row, Row>() {
private Row firstRow;
@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
if (firstRow == null) {
firstRow = value;
}
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
Row colRow = new Row(4);
colRow.setField(0, 0);
colRow.setField(1, 1);
colRow.setField(2, 2);
colRow.setField(3, 3);
out.collect(colRow); // Anchor-C
}
}).name("TargetTestUDF")
.returns(new RowTypeInfo(returnTypes))
.print();
env.execute(OnTimerDemo.class.getSimpleName());
}
}
{code}
* Recurrence steps
** Run the job without state.
** Collect the latest available checkpoint path as 'checkpoint-path-a'
** Stop the job.
** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line.
** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line.
** Then add break-point at 'StreamTask#handleAsyncException' method.
** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method.
** So, The framework can't catch the same exception in the case.
* Root cause:
** !截屏2022-12-27 18.51.12.png!
** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'.
!截屏2022-12-27 19.20.00.png!
Could it be set as the framework's missing handling of exceptions?
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'.
Thank u.
> Ignore the Exception in user-timer Triggerble when recover form state.
> ----------------------------------------------------------------------
>
> Key: FLINK-30511
> URL: https://issues.apache.org/jira/browse/FLINK-30511
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.16.0
> Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
> Reporter: RocMarshal
> Priority: Minor
> Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("taskmanager.numberOfTaskSlots", "4");
> conf.setString("state.checkpoint-storage", "filesystem");
> conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
> conf.setString("execution.checkpointing.interval", "30s");
> //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> env.setParallelism(1);
> EnvironmentSettings envSetting = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
> String sourceDDL = "CREATE TABLE orders (\n" +
> " id INT,\n" +
> " app INT,\n" +
> " user_id STRING" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'rows-per-second'='1',\n" +
> " 'fields.app.min'='1',\n" +
> " 'fields.app.max'='10',\n" +
> " 'fields.user_id.length'='10'\n" +
> ")";
> tableEnv.executeSql(sourceDDL);
> Table query = tableEnv.sqlQuery("select * from orders");
> DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class);
> TypeInformation<?>[] returnTypes = new TypeInformation[4];
> returnTypes[0] = Types.INT;
> returnTypes[1] = Types.INT; // Anchor-B:
> returnTypes[2] = Types.INT;
> returnTypes[3] = Types.INT;
> rowDataStream.keyBy(new KeySelector<Row, String>() {
> @Override
> public String getKey(Row value) throws Exception {
> return value.getFieldAs(2);
> }
> }).process(new KeyedProcessFunction<String, Row, Row>() {
> private Row firstRow;
> @Override
> public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
> if (firstRow == null) {
> firstRow = value;
> }
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000);
> }
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
> Row colRow = new Row(4);
> colRow.setField(0, 0);
> colRow.setField(1, 1);
> colRow.setField(2, 2);
> colRow.setField(3, 3);
> out.collect(colRow); // Anchor-C
> }
> }).name("TargetTestUDF")
> .returns(new RowTypeInfo(returnTypes))
> .print();
> env.execute(OnTimerDemo.class.getSimpleName());
> }
> }
> {code}
> * Recurrence steps
> ** Run the job without state.
> ** Collect the latest available checkpoint path as 'checkpoint-path-a'
> ** Stop the job.
> ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line.
> ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line.
> ** Then add break-point at 'StreamTask#handleAsyncException' method.
> ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method.
> ** So, The framework can't catch the same exception in the case.
> * Root cause:
> ** !截屏2022-12-27 18.51.12.png!
> ** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'.
> !截屏2022-12-27 19.20.00.png!
> Could it be seen as the framework's missing handling of exceptions?
> If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'.
> Thank u.
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)