You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Heidi Hazem Mohamed <H....@nu.edu.eg> on 2020/09/09 11:08:07 UTC

Slow Performance inquiry

Dear,

I am writing a Flink program(Recommender system) needed a matrix as a state which is the rating matrix, While the matrix is very sparse, I implemented a sparse binary matrix to save the memory and save only the ones, not all the matrix and use it as a data type and save it in a value State but unexpectedly the performance became terrible and the job became very slow, I wonder any suggestion to know what is the problem?

My first implementation for the rating matrix state :

MapState<String, Map<String, Float>> ratingMatrix;

The second implementation (the slow one) for rating matrix state:

ValueState<SparseBinaryMatrix> userItemRatingHistory;

and this apart from sparseBinaryMatrix class


public class SparseBinaryMatrix  implements Serializable {

    private ArrayList<Row> content;

    private int rowLength;

    private HashMap<String, Integer> columnLabels;
    private HashMap<Integer, String> inverseColumnLabels;

    private HashMap<String, Integer> rowLabels;
    private HashMap<Integer, String> inverseRowLabels;

    private enum LabelerType{Row, Column};

    public Integer colNumber;
    public  Integer rowNumber;


    // This constructor initializes the matrix with zeros
    public SparseBinaryMatrix(int rows, int columns)
    {
        content = new ArrayList<>(rows);
        rowLength = columns;
//        for (int i = 0; i < rows; i++)
//            content.add(new Row(columns));


    }


Is depending on other class (Row) may lead to this terrible performance while Row is class I have implemented and this is part of it


public class Row  implements Serializable {
    //This is an alternating sorted array
    private ArrayList<Integer> content;
    private int length=0;

    public Row (int numbColumns)
    {
        length = numbColumns;
        for (int i = 0; i < numbColumns;i++)
            setColumnToZero(i);
    }

    public Row (int[] initialValues )
    {
        length = initialValues.length;
        content = new ArrayList<>(length);
        for (int i = 0; i < length;i++)
            setColumn(i, initialValues[i]);
    }

Regards,

Heidy

Re: Slow Performance inquiry

Posted by Timo Walther <tw...@apache.org>.
Hi Heidy,

I agree with David that a heap-based state backend would improve the 
serialization overhead a lot.

If you like to optimize your serialization further, I would recommend to 
look at the type that comes out of TypeInformation.of with a debugger. 
You can find a list of all types and a brief examplanation in 
`org.apache.flink.api.common.typeinfo.Types`. Selecting the type for a 
state should be a well-defined (also in terms of schema 
evolution/backwards compatibility).

Regards,
Timo

On 09.09.20 21:50, David Anderson wrote:
> Heidy, which state backend are you using? With RocksDB Flink will 
> have to do ser/de on every access and update, but with the 
> FsStateBackend, your sparse matrix will sit in memory, and only have to 
> be serialized during checkpointing.
> 
> David
> 
> On Wed, Sep 9, 2020 at 2:41 PM Heidi Hazem Mohamed <H.Hazem@nu.edu.eg 
> <ma...@nu.edu.eg>> wrote:
> 
>     Hi Walther,
> 
>     Many thanks for your answer, I declared the state type as below
> 
>     ValueStateDescriptor<SparseBinaryMatrix> descriptor =
>              new ValueStateDescriptor<SparseBinaryMatrix>(
>                     "Rating Matrix",
>     TypeInformation.of(new TypeHint<SparseBinaryMatrix>() {
>                     }
>             ));
> 
> 
>     Is there a better way?
> 
>     Regards,
> 
>     Heidy
> 
>     ------------------------------------------------------------------------
>     *From:* Timo Walther <twalthr@apache.org <ma...@apache.org>>
>     *Sent:* Wednesday, September 9, 2020 1:58 PM
>     *To:* user@flink.apache.org <ma...@flink.apache.org>
>     <user@flink.apache.org <ma...@flink.apache.org>>
>     *Subject:* Re: Slow Performance inquiry
>     Hi Hazem,
> 
>     I guess your performance is mostly driven by the serialization overhead
>     in this case. How do you declare your state type?
> 
>     Flink comes with different serializers. Not all of them are extracted
>     automatically when using reflective extraction methods:
> 
>     -  Note that `Serializable` declaration has no effect for Flink, other
>     than NOT using Flink's efficient serializers.
>     - Flink's POJO serializer only works with a default constructor
>     present.
>     - Row needs to explicit declaration of fields.
> 
>     Regards,
>     Timo
> 
> 
>     On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
>     > Dear,
>     > 
>     > I am writing a Flink program(Recommender system) needed a matrix as a 
>     > state which is the rating matrix, While the matrix is very sparse, I 
>     > implemented a sparse binary matrix to save the memory and save only the 
>     > ones, not all the matrix and use it as a data type and save it in a 
>     > value State but unexpectedly the performance became terrible and the job 
>     > became very slow, I wonder any suggestion to know what is the problem?
>     > 
>     > My first implementation for the rating matrix state :
>     > 
>     > MapState<String, Map<String, Float>>ratingMatrix;
>     > 
>     > 
>     > The second implementation (the slow one) for rating matrix state:
>     > 
>     > ValueState<SparseBinaryMatrix>userItemRatingHistory;
>     > 
>     > 
>     > and this apart from sparseBinaryMatrix class
>     > 
>     > public class SparseBinaryMatriximplements Serializable {
>     > 
>     >      private ArrayList<Row>content;
>     > 
>     > private int rowLength;
>     > 
>     > private HashMap<String, Integer>columnLabels;
>     > private HashMap<Integer, String>inverseColumnLabels;
>     > 
>     > private HashMap<String, Integer>rowLabels;
>     > private HashMap<Integer, String>inverseRowLabels;
>     > 
>     > private enum LabelerType{Row, Column};
>     > 
>     > public IntegercolNumber;
>     > public IntegerrowNumber;
>     > 
>     > 
>     > // This constructor initializes the matrix with zeros
>     > public SparseBinaryMatrix(int rows, int columns)
>     >      {
>     >          content =new ArrayList<>(rows);
>     > rowLength = columns;
>     > // for (int i = 0; i < rows; i++)
>     > // content.add(new Row(columns));
>     > 
>     > 
>     > }
>     > 
>     > 
>     > 
>     > Is depending on other class (Row) may lead to this terrible performance 
>     > while Row is class I have implemented and this is part of it
>     > 
>     > public class Rowimplements Serializable {
>     >      //This is an alternating sorted array
>     > private ArrayList<Integer>content;
>     > private int length=0;
>     > 
>     > public Row (int numbColumns)
>     >      {
>     >          length = numbColumns;
>     > for (int i =0; i < numbColumns;i++)
>     >              setColumnToZero(i);
>     > }
>     > 
>     >      public Row (int[] initialValues )
>     >      {
>     >          length = initialValues.length;
>     > content =new ArrayList<>(length);
>     > for (int i =0; i <length;i++)
>     >              setColumn(i, initialValues[i]);
>     > }
>     > 
>     > 
>     > Regards,
>     > 
>     > Heidy
>     > 
> 


Re: Slow Performance inquiry

Posted by David Anderson <da...@alpinegizmo.com>.
Heidy, which state backend are you using? With RocksDB Flink will have to
do ser/de on every access and update, but with the FsStateBackend, your
sparse matrix will sit in memory, and only have to be serialized during
checkpointing.

David

On Wed, Sep 9, 2020 at 2:41 PM Heidi Hazem Mohamed <H....@nu.edu.eg>
wrote:

> Hi Walther,
>
> Many thanks for your answer, I declared the state type as below
>
> ValueStateDescriptor<SparseBinaryMatrix> descriptor =
>         new ValueStateDescriptor<SparseBinaryMatrix>(
>                "Rating Matrix",
>                TypeInformation.of(new TypeHint<SparseBinaryMatrix>() {
>                }
>        ));
>
>
> Is there a better way?
>
> Regards,
>
> Heidy
> ------------------------------
> *From:* Timo Walther <tw...@apache.org>
> *Sent:* Wednesday, September 9, 2020 1:58 PM
> *To:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: Slow Performance inquiry
>
> Hi Hazem,
>
> I guess your performance is mostly driven by the serialization overhead
> in this case. How do you declare your state type?
>
> Flink comes with different serializers. Not all of them are extracted
> automatically when using reflective extraction methods:
>
> -  Note that `Serializable` declaration has no effect for Flink, other
> than NOT using Flink's efficient serializers.
> - Flink's POJO serializer only works with a default constructor present.
> - Row needs to explicit declaration of fields.
>
> Regards,
> Timo
>
>
> On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> > Dear,
> >
> > I am writing a Flink program(Recommender system) needed a matrix as a
> > state which is the rating matrix, While the matrix is very sparse, I
> > implemented a sparse binary matrix to save the memory and save only the
> > ones, not all the matrix and use it as a data type and save it in a
> > value State but unexpectedly the performance became terrible and the job
> > became very slow, I wonder any suggestion to know what is the problem?
> >
> > My first implementation for the rating matrix state :
> >
> > MapState<String, Map<String, Float>>ratingMatrix;
> >
> >
> > The second implementation (the slow one) for rating matrix state:
> >
> > ValueState<SparseBinaryMatrix>userItemRatingHistory;
> >
> >
> > and this apart from sparseBinaryMatrix class
> >
> > public class SparseBinaryMatriximplements Serializable {
> >
> >      private ArrayList<Row>content;
> >
> > private int rowLength;
> >
> > private HashMap<String, Integer>columnLabels;
> > private HashMap<Integer, String>inverseColumnLabels;
> >
> > private HashMap<String, Integer>rowLabels;
> > private HashMap<Integer, String>inverseRowLabels;
> >
> > private enum LabelerType{Row, Column};
> >
> > public IntegercolNumber;
> > public IntegerrowNumber;
> >
> >
> > // This constructor initializes the matrix with zeros
> > public SparseBinaryMatrix(int rows, int columns)
> >      {
> >          content =new ArrayList<>(rows);
> > rowLength = columns;
> > // for (int i = 0; i < rows; i++)
> > // content.add(new Row(columns));
> >
> >
> > }
> >
> >
> >
> > Is depending on other class (Row) may lead to this terrible performance
> > while Row is class I have implemented and this is part of it
> >
> > public class Rowimplements Serializable {
> >      //This is an alternating sorted array
> > private ArrayList<Integer>content;
> > private int length=0;
> >
> > public Row (int numbColumns)
> >      {
> >          length = numbColumns;
> > for (int i =0; i < numbColumns;i++)
> >              setColumnToZero(i);
> > }
> >
> >      public Row (int[] initialValues )
> >      {
> >          length = initialValues.length;
> > content =new ArrayList<>(length);
> > for (int i =0; i <length;i++)
> >              setColumn(i, initialValues[i]);
> > }
> >
> >
> > Regards,
> >
> > Heidy
> >
>
>

Re: Slow Performance inquiry

Posted by Heidi Hazem Mohamed <H....@nu.edu.eg>.
Hi Walther,

Many thanks for your answer, I declared the state type as below


ValueStateDescriptor<SparseBinaryMatrix> descriptor =
        new ValueStateDescriptor<SparseBinaryMatrix>(
               "Rating Matrix",
               TypeInformation.of(new TypeHint<SparseBinaryMatrix>() {
               }
       ));

Is there a better way?

Regards,

Heidy

________________________________
From: Timo Walther <tw...@apache.org>
Sent: Wednesday, September 9, 2020 1:58 PM
To: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Slow Performance inquiry

Hi Hazem,

I guess your performance is mostly driven by the serialization overhead
in this case. How do you declare your state type?

Flink comes with different serializers. Not all of them are extracted
automatically when using reflective extraction methods:

-  Note that `Serializable` declaration has no effect for Flink, other
than NOT using Flink's efficient serializers.
- Flink's POJO serializer only works with a default constructor present.
- Row needs to explicit declaration of fields.

Regards,
Timo


On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> Dear,
>
> I am writing a Flink program(Recommender system) needed a matrix as a
> state which is the rating matrix, While the matrix is very sparse, I
> implemented a sparse binary matrix to save the memory and save only the
> ones, not all the matrix and use it as a data type and save it in a
> value State but unexpectedly the performance became terrible and the job
> became very slow, I wonder any suggestion to know what is the problem?
>
> My first implementation for the rating matrix state :
>
> MapState<String, Map<String, Float>>ratingMatrix;
>
>
> The second implementation (the slow one) for rating matrix state:
>
> ValueState<SparseBinaryMatrix>userItemRatingHistory;
>
>
> and this apart from sparseBinaryMatrix class
>
> public class SparseBinaryMatriximplements Serializable {
>
>      private ArrayList<Row>content;
>
> private int rowLength;
>
> private HashMap<String, Integer>columnLabels;
> private HashMap<Integer, String>inverseColumnLabels;
>
> private HashMap<String, Integer>rowLabels;
> private HashMap<Integer, String>inverseRowLabels;
>
> private enum LabelerType{Row, Column};
>
> public IntegercolNumber;
> public IntegerrowNumber;
>
>
> // This constructor initializes the matrix with zeros
> public SparseBinaryMatrix(int rows, int columns)
>      {
>          content =new ArrayList<>(rows);
> rowLength = columns;
> // for (int i = 0; i < rows; i++)
> // content.add(new Row(columns));
>
>
> }
>
>
>
> Is depending on other class (Row) may lead to this terrible performance
> while Row is class I have implemented and this is part of it
>
> public class Rowimplements Serializable {
>      //This is an alternating sorted array
> private ArrayList<Integer>content;
> private int length=0;
>
> public Row (int numbColumns)
>      {
>          length = numbColumns;
> for (int i =0; i < numbColumns;i++)
>              setColumnToZero(i);
> }
>
>      public Row (int[] initialValues )
>      {
>          length = initialValues.length;
> content =new ArrayList<>(length);
> for (int i =0; i <length;i++)
>              setColumn(i, initialValues[i]);
> }
>
>
> Regards,
>
> Heidy
>


Re: Slow Performance inquiry

Posted by Timo Walther <tw...@apache.org>.
Hi Hazem,

I guess your performance is mostly driven by the serialization overhead 
in this case. How do you declare your state type?

Flink comes with different serializers. Not all of them are extracted 
automatically when using reflective extraction methods:

-  Note that `Serializable` declaration has no effect for Flink, other 
than NOT using Flink's efficient serializers.
- Flink's POJO serializer only works with a default constructor present. 
- Row needs to explicit declaration of fields.

Regards,
Timo


On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> Dear,
> 
> I am writing a Flink program(Recommender system) needed a matrix as a 
> state which is the rating matrix, While the matrix is very sparse, I 
> implemented a sparse binary matrix to save the memory and save only the 
> ones, not all the matrix and use it as a data type and save it in a 
> value State but unexpectedly the performance became terrible and the job 
> became very slow, I wonder any suggestion to know what is the problem?
> 
> My first implementation for the rating matrix state :
> 
> MapState<String, Map<String, Float>>ratingMatrix;
> 
> 
> The second implementation (the slow one) for rating matrix state:
> 
> ValueState<SparseBinaryMatrix>userItemRatingHistory;
> 
> 
> and this apart from sparseBinaryMatrix class
> 
> public class SparseBinaryMatriximplements Serializable {
> 
>      private ArrayList<Row>content;
> 
> private int rowLength;
> 
> private HashMap<String, Integer>columnLabels;
> private HashMap<Integer, String>inverseColumnLabels;
> 
> private HashMap<String, Integer>rowLabels;
> private HashMap<Integer, String>inverseRowLabels;
> 
> private enum LabelerType{Row, Column};
> 
> public IntegercolNumber;
> public IntegerrowNumber;
> 
> 
> // This constructor initializes the matrix with zeros
> public SparseBinaryMatrix(int rows, int columns)
>      {
>          content =new ArrayList<>(rows);
> rowLength = columns;
> // for (int i = 0; i < rows; i++)
> // content.add(new Row(columns));
> 
> 
> }
> 
> 
> 
> Is depending on other class (Row) may lead to this terrible performance 
> while Row is class I have implemented and this is part of it
> 
> public class Rowimplements Serializable {
>      //This is an alternating sorted array
> private ArrayList<Integer>content;
> private int length=0;
> 
> public Row (int numbColumns)
>      {
>          length = numbColumns;
> for (int i =0; i < numbColumns;i++)
>              setColumnToZero(i);
> }
> 
>      public Row (int[] initialValues )
>      {
>          length = initialValues.length;
> content =new ArrayList<>(length);
> for (int i =0; i <length;i++)
>              setColumn(i, initialValues[i]);
> }
> 
> 
> Regards,
> 
> Heidy
>