You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dmytro Dragan <dd...@softserveinc.com> on 2020/07/23 13:14:56 UTC

Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

Hi All,

We are working on migration existing pipelines from Flink 1.10 to Flink 1.11.
We are using Blink planner and have unified pipelines which can be used in stream and batch mode.

Stream pipelines works as expected, but batch once fail on Flink 1.11 if they have any table aggregation transformation.

Simple example of failed pipeline:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

TableConfig tableConfig = new TableConfig();
tableConfig.setIdleStateRetentionTime(
        org.apache.flink.api.common.time.Time.minutes(10),
        org.apache.flink.api.common.time.Time.minutes(30)
);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

// is created using work around with ignoring settings.isStreamingMode() check
StreamTableEnvironment tEnv = create(env, settings, tableConfig);

DataStreamSource<A> streamSource = env.fromCollection(asList(new A("1"), new A("2")));

Table table = tEnv.fromDataStream(streamSource);
tEnv.createTemporaryView("A", table);

String sql = "select s from A group by s";

tEnv
         .toRetractStream(tEnv.sqlQuery(sql), Row.class)
         .flatMap(new RetractFlatMap())
         .map(Row::toString)
         .addSink(new TestSinkFunction<>());

env.execute("");

values.forEach(System.out::println);

Exception:
Caused by: java.lang.IllegalStateException: Trying to consume an input partition whose producer is not ready (result type: BLOCKING, partition consumable: false, producer state: DEPLOYING, partition id: 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
                at org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
                …

Adding StreamTableEnvironment execute does not help.

Could you please advise what I`m missing?



Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

Posted by Timo Walther <tw...@apache.org>.
Hi Dmytro,

one major difference between legacy and Blink planner is that the Blink 
planner is not build on top of DataStream API. It uses features of lower 
levels (StreamOperator, Transformation). In the mid-term we want to 
remove the check and make Table API and DataStream API 100% back and 
forth compatible for batch and streaming.

"there is no way to create/retract stream": What are you planning to do 
with the created stream? If you want to sink it into an external system, 
the new FLIP-95 sinks support all changelog semantics now.

Regards,
Timo


On 24.07.20 17:49, Dmytro Dragan wrote:
> Hi Timo,
> Thank you for response.
> 
> Well, it was working.
> We have a number of pipelines in production which reuse DataStream and Table API parts on Flink 1.10, both for stream and batch.
> The same that simple case without aggregation would work in Flink 1.11
> 
> But let`s assume there are some incompatible changes and such approach would not work anymore.
> 
> In case of TableEnvironment there is no way to create/retract stream.
> I would assume that it is possible to wrapped stream in bounded StreamTableSource/ StreamTableSink
> and use deprecated TableEnvironment methods to register them, but I`m wonder if there is a better way to do it.
> 
> It sounds a quite strange that with having Blink planner which optimise DataStream pipelines for stream and batch jobs,
> there is necessity to write the same things on DataStream and DataSet API.
> 
> 
> On 24/07/2020, 15:36, "Timo Walther" <tw...@apache.org> wrote:
> 
>      Hi Dmytro,
>      
>      `StreamTableEnvironment` does not support batch mode currently. Only
>      `TableEnvironment` supports the unified story. I saw that you disabled
>      the check in the `create()` method. This check exists for a reason.
>      
>      For batch execution, the planner sets specific properties on the stream
>      graph that the StreamExecutionEnvironment cannot handle (e.g. blocking
>      inputs). My guess would be that this is the reason for your exception.
>      
>      Have you tried to use the unified `TableEnvironment`?
>      
>      Regards,
>      Timo
>      
>      
>      
>      
>      On 23.07.20 15:14, Dmytro Dragan wrote:
>      > Hi All,
>      >
>      > We are working on migration existing pipelines from Flink 1.10 to Flink
>      > 1.11.
>      >
>      > We are using Blink planner and have unified pipelines which can be used
>      > in stream and batch mode.
>      >
>      > Stream pipelines works as expected, but batch once fail on Flink 1.11 if
>      > they have any table aggregation transformation.
>      >
>      > Simple example of failed pipeline:
>      >
>      > StreamExecutionEnvironment env =
>      > StreamExecutionEnvironment./getExecutionEnvironment/();
>      > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
>      >
>      > TableConfig tableConfig = new TableConfig();
>      > tableConfig.setIdleStateRetentionTime(
>      >          org.apache.flink.api.common.time.Time./minutes/(10),
>      > org.apache.flink.api.common.time.Time./minutes/(30)
>      > );
>      > EnvironmentSettings settings =
>      > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
>      >
>      > // is created using work around with ignoring settings.isStreamingMode()
>      > check
>      > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
>      >
>      > DataStreamSource<A> streamSource = env.fromCollection(/asList/(new
>      > A("1"), new A("2")));
>      >
>      > Table table = tEnv.fromDataStream(streamSource);
>      > tEnv.createTemporaryView("A", table);
>      >
>      > String sql = "select s from A group by s";
>      >
>      > tEnv
>      > .toRetractStream(tEnv.sqlQuery(sql), Row.class)
>      >           .flatMap(new RetractFlatMap())
>      >           .map(Row::toString)
>      >           .addSink(new TestSinkFunction<>());
>      >
>      > env.execute("");
>      >
>      > /values/.forEach(System./out/::println);
>      >
>      > Exception:
>      >
>      > Caused by: java.lang.IllegalStateException: Trying to consume an input
>      > partition whose producer is not ready (result type: BLOCKING, partition
>      > consumable: false, producer state: DEPLOYING, partition id:
>      > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
>      >
>      >                  at
>      > org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
>      >
>      >                  …
>      >
>      > Adding StreamTableEnvironment execute does not help.
>      >
>      > Could you please advise what I`m missing?
>      >
>      
>      
>      
> 


Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

Posted by Dmytro Dragan <dd...@softserveinc.com>.
Hi Timo,
Thank you for response.

Well, it was working.
We have a number of pipelines in production which reuse DataStream and Table API parts on Flink 1.10, both for stream and batch.
The same that simple case without aggregation would work in Flink 1.11

But let`s assume there are some incompatible changes and such approach would not work anymore.

In case of TableEnvironment there is no way to create/retract stream.
I would assume that it is possible to wrapped stream in bounded StreamTableSource/ StreamTableSink 
and use deprecated TableEnvironment methods to register them, but I`m wonder if there is a better way to do it.

It sounds a quite strange that with having Blink planner which optimise DataStream pipelines for stream and batch jobs, 
there is necessity to write the same things on DataStream and DataSet API.


On 24/07/2020, 15:36, "Timo Walther" <tw...@apache.org> wrote:

    Hi Dmytro,
    
    `StreamTableEnvironment` does not support batch mode currently. Only 
    `TableEnvironment` supports the unified story. I saw that you disabled 
    the check in the `create()` method. This check exists for a reason.
    
    For batch execution, the planner sets specific properties on the stream 
    graph that the StreamExecutionEnvironment cannot handle (e.g. blocking 
    inputs). My guess would be that this is the reason for your exception.
    
    Have you tried to use the unified `TableEnvironment`?
    
    Regards,
    Timo
    
    
    
    
    On 23.07.20 15:14, Dmytro Dragan wrote:
    > Hi All,
    > 
    > We are working on migration existing pipelines from Flink 1.10 to Flink 
    > 1.11.
    > 
    > We are using Blink planner and have unified pipelines which can be used 
    > in stream and batch mode.
    > 
    > Stream pipelines works as expected, but batch once fail on Flink 1.11 if 
    > they have any table aggregation transformation.
    > 
    > Simple example of failed pipeline:
    > 
    > StreamExecutionEnvironment env = 
    > StreamExecutionEnvironment./getExecutionEnvironment/();
    > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
    > 
    > TableConfig tableConfig = new TableConfig();
    > tableConfig.setIdleStateRetentionTime(
    >          org.apache.flink.api.common.time.Time./minutes/(10),
    > org.apache.flink.api.common.time.Time./minutes/(30)
    > );
    > EnvironmentSettings settings = 
    > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
    > 
    > // is created using work around with ignoring settings.isStreamingMode() 
    > check
    > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
    > 
    > DataStreamSource<A> streamSource = env.fromCollection(/asList/(new 
    > A("1"), new A("2")));
    > 
    > Table table = tEnv.fromDataStream(streamSource);
    > tEnv.createTemporaryView("A", table);
    > 
    > String sql = "select s from A group by s";
    > 
    > tEnv
    > .toRetractStream(tEnv.sqlQuery(sql), Row.class)
    >           .flatMap(new RetractFlatMap())
    >           .map(Row::toString)
    >           .addSink(new TestSinkFunction<>());
    > 
    > env.execute("");
    > 
    > /values/.forEach(System./out/::println);
    > 
    > Exception:
    > 
    > Caused by: java.lang.IllegalStateException: Trying to consume an input 
    > partition whose producer is not ready (result type: BLOCKING, partition 
    > consumable: false, producer state: DEPLOYING, partition id: 
    > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
    > 
    >                  at 
    > org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
    > 
    >                  …
    > 
    > Adding StreamTableEnvironment execute does not help.
    > 
    > Could you please advise what I`m missing?
    > 
    
    
    


Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

Posted by Timo Walther <tw...@apache.org>.
Hi Dmytro,

`StreamTableEnvironment` does not support batch mode currently. Only 
`TableEnvironment` supports the unified story. I saw that you disabled 
the check in the `create()` method. This check exists for a reason.

For batch execution, the planner sets specific properties on the stream 
graph that the StreamExecutionEnvironment cannot handle (e.g. blocking 
inputs). My guess would be that this is the reason for your exception.

Have you tried to use the unified `TableEnvironment`?

Regards,
Timo




On 23.07.20 15:14, Dmytro Dragan wrote:
> Hi All,
> 
> We are working on migration existing pipelines from Flink 1.10 to Flink 
> 1.11.
> 
> We are using Blink planner and have unified pipelines which can be used 
> in stream and batch mode.
> 
> Stream pipelines works as expected, but batch once fail on Flink 1.11 if 
> they have any table aggregation transformation.
> 
> Simple example of failed pipeline:
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment./getExecutionEnvironment/();
> env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
> 
> TableConfig tableConfig = new TableConfig();
> tableConfig.setIdleStateRetentionTime(
>          org.apache.flink.api.common.time.Time./minutes/(10),
> org.apache.flink.api.common.time.Time./minutes/(30)
> );
> EnvironmentSettings settings = 
> EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
> 
> // is created using work around with ignoring settings.isStreamingMode() 
> check
> StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
> 
> DataStreamSource<A> streamSource = env.fromCollection(/asList/(new 
> A("1"), new A("2")));
> 
> Table table = tEnv.fromDataStream(streamSource);
> tEnv.createTemporaryView("A", table);
> 
> String sql = "select s from A group by s";
> 
> tEnv
> .toRetractStream(tEnv.sqlQuery(sql), Row.class)
>           .flatMap(new RetractFlatMap())
>           .map(Row::toString)
>           .addSink(new TestSinkFunction<>());
> 
> env.execute("");
> 
> /values/.forEach(System./out/::println);
> 
> Exception:
> 
> Caused by: java.lang.IllegalStateException: Trying to consume an input 
> partition whose producer is not ready (result type: BLOCKING, partition 
> consumable: false, producer state: DEPLOYING, partition id: 
> 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
> 
>                  at 
> org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
> 
>                  …
> 
> Adding StreamTableEnvironment execute does not help.
> 
> Could you please advise what I`m missing?
>