You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/07/12 17:54:17 UTC

Getting Errors when using keyby()

I have a DataStream on which I am applying a CEP pattern and grouping the
results using keyby(). The DataStream Object is a pojo :

public class DataStreamObject {
    private String field1;
    private String field2;

    public DataStreamObject(String field1, String field2) {
        this.field1 = field1;
        this.field2 = field2;
    }

    public void setField1(String field1) {
        this.field1 = field1;
    }

    public String getField1() {
        return field1;
    }


    public void setField2(String field2) {
        this.field2 = field2;
    }

    public String getField2() {
        return field2;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof DataStreamObject)) return false;

        DataStreamObject that = (DataStreamObject) o;

        if (!getField1().equals(that.getField1())) return false;
        return getField2().equals(that.getField2());
    }

    @Override
    public int hashCode() {
        int result = getField1().hashCode();
        result = 31 * result + getField2().hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "DriverSameAsCustomer{" +
                "field1='" + field1 + '\'' +
                ", field2='" + field2 + '\'' +
                '}';
    }
}

When I submit my flinkjob, I get the following error :


This type (GenericType<com.foo.DataStreamObject>) cannot be used as key.
	org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
	org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
	com.foo.Main.main(Main.java:66)


As I understand, I do not need to implement Key interface if the class
is a POJO (which it is).

Please help me understand where I am going wrong an suggest a fix.

Re: Getting Errors when using keyby()

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Sridhar,

Your class is missing default constructor(without arguments) thus it is not a valid POJO in Flink.

You can check the requirements for POJO in link here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#pojos


> On 12 Jul 2017, at 19:54, Sridhar Chellappa <fl...@gmail.com> wrote:
> 
> I have a DataStream on which I am applying a CEP pattern and grouping the results using keyby(). The DataStream Object is a pojo :
> 
> public class DataStreamObject {
>     private String field1;
>     private String field2;
> 
>     public DataStreamObject(String field1, String field2) {
>         this.field1 = field1;
>         this.field2 = field2;
>     }
> 
>     public void setField1(String field1) {
>         this.field1 = field1;
>     }
> 
>     public String getField1() {
>         return field1;
>     }
> 
> 
>     public void setField2(String field2) {
>         this.field2 = field2;
>     }
> 
>     public String getField2() {
>         return field2;
>     }
> 
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (!(o instanceof DataStreamObject)) return false;
> 
>         DataStreamObject that = (DataStreamObject) o;
> 
>         if (!getField1().equals(that.getField1())) return false;
>         return getField2().equals(that.getField2());
>     }
> 
>     @Override
>     public int hashCode() {
>         int result = getField1().hashCode();
>         result = 31 * result + getField2().hashCode();
>         return result;
>     }
> 
>     @Override
>     public String toString() {
>         return "DriverSameAsCustomer{" +
>                 "field1='" + field1 + '\'' +
>                 ", field2='" + field2 + '\'' +
>                 '}';
>     }
> }
> 
> When I submit my flinkjob, I get the following error :
> 
> 
> This type (GenericType<com.foo.DataStreamObject>) cannot be used as key.
> 	org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
> 	org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
> 	com.foo.Main.main(Main.java:66)
> 
> 
> As I understand, I do not need to implement Key interface if the class is a POJO (which it is).
> 
> Please help me understand where I am going wrong an suggest a fix.
> 
>