You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Kirschnick, Johannes" <jo...@tu-berlin.de> on 2015/02/24 11:13:45 UTC

Operating on Serialized Data

Hi list,


I have a general question on as to whether it's possible to significantly speed up the processing by cutting down on the serialization costs during iterations.


The basic setup that I have are a couple of vectors that are repeatedly mutated (added & multiplied) as part of an iterative run within a reducer.

A vector is basically "just" an array of doubles - all of the same size.


I noticed during simple profiling that roughly 50% of the execution time is spent on serializing the data in using the com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo.


I know that any custom operation would would varant custom processing, but given the serialization contributes such a large amount of processing time to the overall runtime it might very well be worthwhile


Is that currently exposed in any fashion to the user code, or are there some hooks I could look into?


Thanks

Johannes

AW: Operating on Serialized Data

Posted by "Kirschnick, Johannes" <jo...@tu-berlin.de>.
Hi Max,

thanks for the detailed answer-
That was exactly what I have been looking for.
I switched the serialization from kryo to use the Value interface instead, keeping everything constant basically halved the execution time - nice.

One note - iterating over the array in serial fashion works, but smells inefficient.
Would it make sense to do bulk reading using some byte buffer - or is that not needed as the DataInputView is backed by the memory manager and that would cause issues?

Johannes
________________________________________
Von: Max Michels <mx...@apache.org>
Gesendet: Dienstag, 24. Februar 2015 18:11
An: dev@flink.apache.org
Betreff: Re: Operating on Serialized Data

Apparently, the mailing list doesn't allow attachments.

Here the example with syntax highlighting:
https://gist.github.com/mxm/d1929b4b69dda87d5c37


public class CustomSerializer {

   public static class Vector implements Value {

      private transient double[] doubleValues;

      public Vector() {
      }

      public Vector(double[] doubleValues) {
         this.doubleValues = doubleValues;
      }

      public double getElement(int position) {
         return doubleValues[position];
      }

      public void setElement(double value, int position) {
         doubleValues[position] = value;
      }

      public void multiply(int factor) {
         for (int i = 0; i < doubleValues.length; i++) {
            doubleValues[i] *= factor;
         }
      }

      @Override
      public void write(DataOutputView out) throws IOException {
         out.writeInt(doubleValues.length);
         for (double value : doubleValues) {
            out.writeDouble(value);
         }
      }

      @Override
      public void read(DataInputView in) throws IOException {
         int length = in.readInt();
         double[] array = new double[length];
         for (int i = 0; i < length; i++) {
            array[i] = in.readDouble();
         }
         this.doubleValues = array;
      }

      @Override
      public String toString() {
         return "Vector{" +
               "doubleValues=" + Arrays.toString(doubleValues) +
               '}';
      }
   }

   public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Vector[] vectorList = new Vector[1024];

      // create some sample data
      for (int v = 0; v < vectorList.length; v++) {
         double[] arr = new double[128];
         for (int i = 0; i < arr.length; i++) {
            arr[i] = i * 1.23 * v;
         }
         vectorList[v] = new Vector(arr);
      }

      // create data set
      DataSet<Vector> source = env.fromElements(vectorList);

      // multiply all vectors by 2
      DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() {
         private static final long serialVersionUID = -1511665386949403921L;

         @Override
         public Vector map(Vector value) throws Exception {
            value.multiply(2);
            return value;
         }
      });

      ds.print();

      env.execute();

   }
}


On Tue, Feb 24, 2015 at 5:43 PM, Max Michels <mx...@apache.org> wrote:
> Hi Johannes,
>
> Thanks for your question. You can try to implement the Value interface
> for your Vector PoJo. It has to have an empty constructor and
> implement the write and read methods of the interface for
> serialization.
>
> Based on your description, I've implemented an example to demonstrate
> the use of the Value interface. It would be interesting to hear from
> you whether you could decrease the serialization time using this
> serialization method.
>
> Best regards,
> Max
>
>
> On Tue, Feb 24, 2015 at 11:13 AM, Kirschnick, Johannes
> <jo...@tu-berlin.de> wrote:
>> Hi list,
>>
>>
>> I have a general question on as to whether it's possible to significantly speed up the processing by cutting down on the serialization costs during iterations.
>>
>>
>> The basic setup that I have are a couple of vectors that are repeatedly mutated (added & multiplied) as part of an iterative run within a reducer.
>>
>> A vector is basically "just" an array of doubles - all of the same size.
>>
>>
>> I noticed during simple profiling that roughly 50% of the execution time is spent on serializing the data in using the com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo.
>>
>>
>> I know that any custom operation would would varant custom processing, but given the serialization contributes such a large amount of processing time to the overall runtime it might very well be worthwhile
>>
>>
>> Is that currently exposed in any fashion to the user code, or are there some hooks I could look into?
>>
>>
>> Thanks
>>
>> Johannes

Re: Operating on Serialized Data

Posted by Max Michels <mx...@apache.org>.
Apparently, the mailing list doesn't allow attachments.

Here the example with syntax highlighting:
https://gist.github.com/mxm/d1929b4b69dda87d5c37


public class CustomSerializer {

   public static class Vector implements Value {

      private transient double[] doubleValues;

      public Vector() {
      }

      public Vector(double[] doubleValues) {
         this.doubleValues = doubleValues;
      }

      public double getElement(int position) {
         return doubleValues[position];
      }

      public void setElement(double value, int position) {
         doubleValues[position] = value;
      }

      public void multiply(int factor) {
         for (int i = 0; i < doubleValues.length; i++) {
            doubleValues[i] *= factor;
         }
      }

      @Override
      public void write(DataOutputView out) throws IOException {
         out.writeInt(doubleValues.length);
         for (double value : doubleValues) {
            out.writeDouble(value);
         }
      }

      @Override
      public void read(DataInputView in) throws IOException {
         int length = in.readInt();
         double[] array = new double[length];
         for (int i = 0; i < length; i++) {
            array[i] = in.readDouble();
         }
         this.doubleValues = array;
      }

      @Override
      public String toString() {
         return "Vector{" +
               "doubleValues=" + Arrays.toString(doubleValues) +
               '}';
      }
   }

   public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Vector[] vectorList = new Vector[1024];

      // create some sample data
      for (int v = 0; v < vectorList.length; v++) {
         double[] arr = new double[128];
         for (int i = 0; i < arr.length; i++) {
            arr[i] = i * 1.23 * v;
         }
         vectorList[v] = new Vector(arr);
      }

      // create data set
      DataSet<Vector> source = env.fromElements(vectorList);

      // multiply all vectors by 2
      DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() {
         private static final long serialVersionUID = -1511665386949403921L;

         @Override
         public Vector map(Vector value) throws Exception {
            value.multiply(2);
            return value;
         }
      });

      ds.print();

      env.execute();

   }
}


On Tue, Feb 24, 2015 at 5:43 PM, Max Michels <mx...@apache.org> wrote:
> Hi Johannes,
>
> Thanks for your question. You can try to implement the Value interface
> for your Vector PoJo. It has to have an empty constructor and
> implement the write and read methods of the interface for
> serialization.
>
> Based on your description, I've implemented an example to demonstrate
> the use of the Value interface. It would be interesting to hear from
> you whether you could decrease the serialization time using this
> serialization method.
>
> Best regards,
> Max
>
>
> On Tue, Feb 24, 2015 at 11:13 AM, Kirschnick, Johannes
> <jo...@tu-berlin.de> wrote:
>> Hi list,
>>
>>
>> I have a general question on as to whether it's possible to significantly speed up the processing by cutting down on the serialization costs during iterations.
>>
>>
>> The basic setup that I have are a couple of vectors that are repeatedly mutated (added & multiplied) as part of an iterative run within a reducer.
>>
>> A vector is basically "just" an array of doubles - all of the same size.
>>
>>
>> I noticed during simple profiling that roughly 50% of the execution time is spent on serializing the data in using the com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo.
>>
>>
>> I know that any custom operation would would varant custom processing, but given the serialization contributes such a large amount of processing time to the overall runtime it might very well be worthwhile
>>
>>
>> Is that currently exposed in any fashion to the user code, or are there some hooks I could look into?
>>
>>
>> Thanks
>>
>> Johannes

Re: Operating on Serialized Data

Posted by Max Michels <mx...@apache.org>.
Hi Johannes,

Thanks for your question. You can try to implement the Value interface
for your Vector PoJo. It has to have an empty constructor and
implement the write and read methods of the interface for
serialization.

Based on your description, I've implemented an example to demonstrate
the use of the Value interface. It would be interesting to hear from
you whether you could decrease the serialization time using this
serialization method.

Best regards,
Max


On Tue, Feb 24, 2015 at 11:13 AM, Kirschnick, Johannes
<jo...@tu-berlin.de> wrote:
> Hi list,
>
>
> I have a general question on as to whether it's possible to significantly speed up the processing by cutting down on the serialization costs during iterations.
>
>
> The basic setup that I have are a couple of vectors that are repeatedly mutated (added & multiplied) as part of an iterative run within a reducer.
>
> A vector is basically "just" an array of doubles - all of the same size.
>
>
> I noticed during simple profiling that roughly 50% of the execution time is spent on serializing the data in using the com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo.
>
>
> I know that any custom operation would would varant custom processing, but given the serialization contributes such a large amount of processing time to the overall runtime it might very well be worthwhile
>
>
> Is that currently exposed in any fashion to the user code, or are there some hooks I could look into?
>
>
> Thanks
>
> Johannes