You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2017/07/14 07:45:00 UTC

[jira] [Updated] (FLINK-7180) CoGroupStream perform checkpoint failed

     [ https://issues.apache.org/jira/browse/FLINK-7180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sihua Zhou updated FLINK-7180:
------------------------------
    Description: 
When using the CoGroup api and enable the checkpoint, Job will failed when performing checkpoint, e.g:
{code:java}
        input1.coGroup(input2)
                .where(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .equalTo(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .window(SlothJoinWindow.create())
                .trigger(new SlothWindowTrigger(0))
                .apply(new CoGroupFunction<String, String, String>() {
                    @Override
                    public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
                        String outputStr = "first:" + first + " , second:" + second;
                        System.out.println(outputStr);
                        out.collect(outputStr);
                    }
                })
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .print();
{code}

  was:
When using the CoGroup api and enable the checkpoint, Job will failed when perform checkpoint, e.g:
{code:java}
        input1.coGroup(input2)
                .where(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .equalTo(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .window(SlothJoinWindow.create())
                .trigger(new SlothWindowTrigger(0))
                .apply(new CoGroupFunction<String, String, String>() {
                    @Override
                    public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
                        String outputStr = "first:" + first + " , second:" + second;
                        System.out.println(outputStr);
                        out.collect(outputStr);
                    }
                })
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .print();
{code}


> CoGroupStream perform checkpoint failed
> ---------------------------------------
>
>                 Key: FLINK-7180
>                 URL: https://issues.apache.org/jira/browse/FLINK-7180
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> When using the CoGroup api and enable the checkpoint, Job will failed when performing checkpoint, e.g:
> {code:java}
>         input1.coGroup(input2)
>                 .where(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .equalTo(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .window(SlothJoinWindow.create())
>                 .trigger(new SlothWindowTrigger(0))
>                 .apply(new CoGroupFunction<String, String, String>() {
>                     @Override
>                     public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
>                         String outputStr = "first:" + first + " , second:" + second;
>                         System.out.println(outputStr);
>                         out.collect(outputStr);
>                     }
>                 })
>                 .keyBy(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)