You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Chesnay Schepler <ch...@fu-berlin.de> on 2014/08/27 20:34:01 UTC

Python API - Weird Performance Issue

Hello everyone,

This will be some kind of brainstorming question.

As some of you may know I am currently working on the Python API. The 
most crucial part here is how the data is exchanged between Java and Python.
Up to this point we used pipes for this, but switched recently to memory 
mapped files in hopes of increasing the (lacking) performance.

Early (simplified) prototypes (outside of Flink) showed that this would 
yield a significant increase. yet when i added the code to flink and ran 
a job, there was
no effect. like at all. two radically different schemes ran in /exactly/ 
the same time.

my conclusion was that code already in place (and not part of the 
prototypes) is responsible for this.
so i went ahead and modified the prototypes to use all relevant code 
from the Python API in order to narrow down the culprit. but this time, 
the performance increase was there.

Now here's the question: How can the /very same code/ perform so much 
worse when integrated into flink? if the code is not the problem, what 
could be it?

i spent a lot of time looking for that one line of code that cripples 
the performance, but I'm pretty much out of places to look.


Re: Python API - Weird Performance Issue

Posted by Chesnay Schepler <ch...@fu-berlin.de>.
havent tried tcp.

all i do is create a socket and use send/receive operations as some kind 
of semaphore. i dont even access the contents of the datagram.

On 10.9.2014 15:17, Stephan Ewen wrote:
> Maybe there is some quirk in the way you use the datagrams. Have you tried
> it through TCP sockets?
>
> On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> only the coordination is done via UDP.
>>
>> i agree with what you say about the loops; currently looking into using
>> FileLocks.
>>
>>
>> On 9.9.2014 11:33, Stephan Ewen wrote:
>>
>>> Hey!
>>>
>>> The UDP version is 25x slower? That's massive. Are you sending the records
>>> through that as well, or just the coordination?
>>>
>>> Regarding busy waiting loops: There has to be a better way to do that. It
>>> will behave utterly unpredictable. Once the python side does I/O, has a
>>> separate process or thread or goes asynchronously into a library
>>> (scikitlearn, numpy), the loop cannot be expected to stay at 5%.
>>>
>>> You have tested that with a job where both java and python side have some
>>> work to do. In case of a job where one side waits for the other, the
>>> waiting side will burn cycles like crazy. Then run it in parallel (#cores)
>>> and you may get executions where little more happens then the busy waiting
>>> loop burning cycles.
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
>>> chesnay.schepler@fu-berlin.de> wrote:
>>>
>>>   sorry for the late answer.
>>>> today i did a quick hack to replace the synchronization completely with
>>>> udp. its still synchronous and record based, but 25x slower.
>>>> regarding busy-loops i would propose the following:
>>>>
>>>> 1. leave the python side as it is. its doing most of the heavy lifting
>>>>      anyway and will run at 100% regardless of the loops. (the loops only
>>>>      take up 5% of the total runtime)
>>>> 2. once we exchange buffers instead of single records the IO operations
>>>>      and synchronization will take a fairly constant time. we could then
>>>>      put the java process to sleep manually for that time instead of
>>>>      waiting. it may not be as good as a blocking operation, but it
>>>>      should keep the cpu consumption down to some extent.
>>>>
>>>>
>>>> On 1.9.2014 22:50, Ufuk Celebi wrote:
>>>>
>>>>   Hey Chesnay,
>>>>> any progress on this today? Are you going for the UDP buffer
>>>>> availability
>>>>> notifications Stephan proposed instead of the busy loop?
>>>>>
>>>>> Ufuk
>>>>>
>>>>>
>>>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>
>>>>>    the performance differences occur on the same system (16GB, 4 cores +
>>>>>
>>>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>>>>> operator.
>>>>>> plenty of resources :/
>>>>>>
>>>>>>
>>>>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>>>>
>>>>>>    Hey Chesnay!
>>>>>>
>>>>>>> Here are some thoughts:
>>>>>>>
>>>>>>>      - The repeated checking for 1 or 0 is indeed a busy loop. These
>>>>>>> may
>>>>>>> behave
>>>>>>> very different in different settings. If you run the code isolated,
>>>>>>> you
>>>>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>>>>> parallel
>>>>>>> instances in a larger framework, and it eats away CPU cycles from the
>>>>>>> threads that do the work - it starts hurting badly.
>>>>>>>
>>>>>>>      - You may get around a copy into the shared memory (ByteBuffer
>>>>>>> into
>>>>>>> MemoryMappedFile) by creating an according DataOutputView - save one
>>>>>>> more
>>>>>>> data copy. That's the next step, though, first solve the other issue.
>>>>>>>
>>>>>>> The last time I implemented such an inter-process data pipe between
>>>>>>> languages, I had a similar issue: No support for system wide
>>>>>>> semaphores
>>>>>>> (or
>>>>>>> something similar) on both sides.
>>>>>>>
>>>>>>> I used Shared memory for the buffers, and a local network socket (UDP,
>>>>>>> but
>>>>>>> I guess TCP would be fine as well) for notifications when buffers are
>>>>>>> available. That worked pretty well, yielded high throughput, because
>>>>>>> the
>>>>>>> big buffers were not copied (unlike in streams), and the UDP
>>>>>>> notifications
>>>>>>> were very fast (fire and forget datagrams).
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>
>>>>>>>     Hey Stephan,
>>>>>>>
>>>>>>>   I'd like to point out right away that the code related to your
>>>>>>>> questions
>>>>>>>> is shared by both programs.
>>>>>>>>
>>>>>>>> regarding your first point: i have a byte[] into which i serialize
>>>>>>>> the
>>>>>>>> data first using a ByteBuffer, and then write that data to a
>>>>>>>> MappedByteBuffer.
>>>>>>>>
>>>>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>>>>> things
>>>>>>>> like semaphores or similar that work between python and java alike.
>>>>>>>>
>>>>>>>> the data exchange is currently completely synchronous. java writes a
>>>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>>>>>> whether
>>>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>>>>>> happens, it reads the record, sets the bit to 0 which tells java that
>>>>>>>> it
>>>>>>>> has read the record and can write the next one. this scheme works the
>>>>>>>> same
>>>>>>>> way the other way around.
>>>>>>>>
>>>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>>>>>> rather
>>>>>>>> should be...) way faster (5x) that what we had so far though
>>>>>>>> (asynchronous
>>>>>>>> pipes).
>>>>>>>> (i also tried different schemes that all had no effect, so i decided
>>>>>>>> to
>>>>>>>> stick with the easiest one)
>>>>>>>>
>>>>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>>>>
>>>>>>>>     Hi Chesnay!
>>>>>>>>
>>>>>>>>   That is an interesting problem, though hard to judge with the
>>>>>>>>> information
>>>>>>>>> we have.
>>>>>>>>>
>>>>>>>>> Can you elaborate a bit on the following points:
>>>>>>>>>
>>>>>>>>>       - When putting the objects from the Java Flink side into the
>>>>>>>>> shared
>>>>>>>>> memory, you need to serialize them. How do you do that? Into a
>>>>>>>>> buffer,
>>>>>>>>> then
>>>>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>>>>
>>>>>>>>>       - Shared memory access has to be somehow controlled. The pipes
>>>>>>>>> give
>>>>>>>>> you
>>>>>>>>> flow control for free (blocking write calls when the stream consumer
>>>>>>>>> is
>>>>>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>>>>>> semaphores,
>>>>>>>>> or, in java File(Range)Locks to coordinate access and block until
>>>>>>>>> memory
>>>>>>>>> regions are made available. Can you check if there are some busy
>>>>>>>>> waiting
>>>>>>>>> parts in you code?
>>>>>>>>>
>>>>>>>>>       - More general: The code is slower, but does it burn CPU
>>>>>>>>> cycles in
>>>>>>>>> its
>>>>>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>>>>>
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>>>
>>>>>>>>>      Hello everyone,
>>>>>>>>>
>>>>>>>>>    This will be some kind of brainstorming question.
>>>>>>>>>
>>>>>>>>>> As some of you may know I am currently working on the Python API.
>>>>>>>>>> The
>>>>>>>>>> most
>>>>>>>>>> crucial part here is how the data is exchanged between Java and
>>>>>>>>>> Python.
>>>>>>>>>> Up to this point we used pipes for this, but switched recently to
>>>>>>>>>> memory
>>>>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>>>>>
>>>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this
>>>>>>>>>> would
>>>>>>>>>> yield a significant increase. yet when i added the code to flink
>>>>>>>>>> and
>>>>>>>>>> ran
>>>>>>>>>> a
>>>>>>>>>> job, there was
>>>>>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>>>>>> /exactly/
>>>>>>>>>> the same time.
>>>>>>>>>>
>>>>>>>>>> my conclusion was that code already in place (and not part of the
>>>>>>>>>> prototypes) is responsible for this.
>>>>>>>>>> so i went ahead and modified the prototypes to use all relevant
>>>>>>>>>> code
>>>>>>>>>> from
>>>>>>>>>> the Python API in order to narrow down the culprit. but this time,
>>>>>>>>>> the
>>>>>>>>>> performance increase was there.
>>>>>>>>>>
>>>>>>>>>> Now here's the question: How can the /very same code/ perform so
>>>>>>>>>> much
>>>>>>>>>> worse when integrated into flink? if the code is not the problem,
>>>>>>>>>> what
>>>>>>>>>> could be it?
>>>>>>>>>>
>>>>>>>>>> i spent a lot of time looking for that one line of code that
>>>>>>>>>> cripples
>>>>>>>>>> the
>>>>>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>


Re: Python API - Weird Performance Issue

Posted by Stephan Ewen <se...@apache.org>.
Maybe there is some quirk in the way you use the datagrams. Have you tried
it through TCP sockets?

On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> only the coordination is done via UDP.
>
> i agree with what you say about the loops; currently looking into using
> FileLocks.
>
>
> On 9.9.2014 11:33, Stephan Ewen wrote:
>
>> Hey!
>>
>> The UDP version is 25x slower? That's massive. Are you sending the records
>> through that as well, or just the coordination?
>>
>> Regarding busy waiting loops: There has to be a better way to do that. It
>> will behave utterly unpredictable. Once the python side does I/O, has a
>> separate process or thread or goes asynchronously into a library
>> (scikitlearn, numpy), the loop cannot be expected to stay at 5%.
>>
>> You have tested that with a job where both java and python side have some
>> work to do. In case of a job where one side waits for the other, the
>> waiting side will burn cycles like crazy. Then run it in parallel (#cores)
>> and you may get executions where little more happens then the busy waiting
>> loop burning cycles.
>>
>> Stephan
>>
>>
>> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>
>>  sorry for the late answer.
>>>
>>> today i did a quick hack to replace the synchronization completely with
>>> udp. its still synchronous and record based, but 25x slower.
>>> regarding busy-loops i would propose the following:
>>>
>>> 1. leave the python side as it is. its doing most of the heavy lifting
>>>     anyway and will run at 100% regardless of the loops. (the loops only
>>>     take up 5% of the total runtime)
>>> 2. once we exchange buffers instead of single records the IO operations
>>>     and synchronization will take a fairly constant time. we could then
>>>     put the java process to sleep manually for that time instead of
>>>     waiting. it may not be as good as a blocking operation, but it
>>>     should keep the cpu consumption down to some extent.
>>>
>>>
>>> On 1.9.2014 22:50, Ufuk Celebi wrote:
>>>
>>>  Hey Chesnay,
>>>>
>>>> any progress on this today? Are you going for the UDP buffer
>>>> availability
>>>> notifications Stephan proposed instead of the busy loop?
>>>>
>>>> Ufuk
>>>>
>>>>
>>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>
>>>>   the performance differences occur on the same system (16GB, 4 cores +
>>>>
>>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>>>> operator.
>>>>> plenty of resources :/
>>>>>
>>>>>
>>>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>>>
>>>>>   Hey Chesnay!
>>>>>
>>>>>> Here are some thoughts:
>>>>>>
>>>>>>     - The repeated checking for 1 or 0 is indeed a busy loop. These
>>>>>> may
>>>>>> behave
>>>>>> very different in different settings. If you run the code isolated,
>>>>>> you
>>>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>>>> parallel
>>>>>> instances in a larger framework, and it eats away CPU cycles from the
>>>>>> threads that do the work - it starts hurting badly.
>>>>>>
>>>>>>     - You may get around a copy into the shared memory (ByteBuffer
>>>>>> into
>>>>>> MemoryMappedFile) by creating an according DataOutputView - save one
>>>>>> more
>>>>>> data copy. That's the next step, though, first solve the other issue.
>>>>>>
>>>>>> The last time I implemented such an inter-process data pipe between
>>>>>> languages, I had a similar issue: No support for system wide
>>>>>> semaphores
>>>>>> (or
>>>>>> something similar) on both sides.
>>>>>>
>>>>>> I used Shared memory for the buffers, and a local network socket (UDP,
>>>>>> but
>>>>>> I guess TCP would be fine as well) for notifications when buffers are
>>>>>> available. That worked pretty well, yielded high throughput, because
>>>>>> the
>>>>>> big buffers were not copied (unlike in streams), and the UDP
>>>>>> notifications
>>>>>> were very fast (fire and forget datagrams).
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>
>>>>>>    Hey Stephan,
>>>>>>
>>>>>>  I'd like to point out right away that the code related to your
>>>>>>> questions
>>>>>>> is shared by both programs.
>>>>>>>
>>>>>>> regarding your first point: i have a byte[] into which i serialize
>>>>>>> the
>>>>>>> data first using a ByteBuffer, and then write that data to a
>>>>>>> MappedByteBuffer.
>>>>>>>
>>>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>>>> things
>>>>>>> like semaphores or similar that work between python and java alike.
>>>>>>>
>>>>>>> the data exchange is currently completely synchronous. java writes a
>>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>>>>> whether
>>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>>>>> happens, it reads the record, sets the bit to 0 which tells java that
>>>>>>> it
>>>>>>> has read the record and can write the next one. this scheme works the
>>>>>>> same
>>>>>>> way the other way around.
>>>>>>>
>>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>>>>> rather
>>>>>>> should be...) way faster (5x) that what we had so far though
>>>>>>> (asynchronous
>>>>>>> pipes).
>>>>>>> (i also tried different schemes that all had no effect, so i decided
>>>>>>> to
>>>>>>> stick with the easiest one)
>>>>>>>
>>>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>>>
>>>>>>>    Hi Chesnay!
>>>>>>>
>>>>>>>  That is an interesting problem, though hard to judge with the
>>>>>>>> information
>>>>>>>> we have.
>>>>>>>>
>>>>>>>> Can you elaborate a bit on the following points:
>>>>>>>>
>>>>>>>>      - When putting the objects from the Java Flink side into the
>>>>>>>> shared
>>>>>>>> memory, you need to serialize them. How do you do that? Into a
>>>>>>>> buffer,
>>>>>>>> then
>>>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>>>
>>>>>>>>      - Shared memory access has to be somehow controlled. The pipes
>>>>>>>> give
>>>>>>>> you
>>>>>>>> flow control for free (blocking write calls when the stream consumer
>>>>>>>> is
>>>>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>>>>> semaphores,
>>>>>>>> or, in java File(Range)Locks to coordinate access and block until
>>>>>>>> memory
>>>>>>>> regions are made available. Can you check if there are some busy
>>>>>>>> waiting
>>>>>>>> parts in you code?
>>>>>>>>
>>>>>>>>      - More general: The code is slower, but does it burn CPU
>>>>>>>> cycles in
>>>>>>>> its
>>>>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>>
>>>>>>>>     Hello everyone,
>>>>>>>>
>>>>>>>>   This will be some kind of brainstorming question.
>>>>>>>>
>>>>>>>>> As some of you may know I am currently working on the Python API.
>>>>>>>>> The
>>>>>>>>> most
>>>>>>>>> crucial part here is how the data is exchanged between Java and
>>>>>>>>> Python.
>>>>>>>>> Up to this point we used pipes for this, but switched recently to
>>>>>>>>> memory
>>>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>>>>
>>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this
>>>>>>>>> would
>>>>>>>>> yield a significant increase. yet when i added the code to flink
>>>>>>>>> and
>>>>>>>>> ran
>>>>>>>>> a
>>>>>>>>> job, there was
>>>>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>>>>> /exactly/
>>>>>>>>> the same time.
>>>>>>>>>
>>>>>>>>> my conclusion was that code already in place (and not part of the
>>>>>>>>> prototypes) is responsible for this.
>>>>>>>>> so i went ahead and modified the prototypes to use all relevant
>>>>>>>>> code
>>>>>>>>> from
>>>>>>>>> the Python API in order to narrow down the culprit. but this time,
>>>>>>>>> the
>>>>>>>>> performance increase was there.
>>>>>>>>>
>>>>>>>>> Now here's the question: How can the /very same code/ perform so
>>>>>>>>> much
>>>>>>>>> worse when integrated into flink? if the code is not the problem,
>>>>>>>>> what
>>>>>>>>> could be it?
>>>>>>>>>
>>>>>>>>> i spent a lot of time looking for that one line of code that
>>>>>>>>> cripples
>>>>>>>>> the
>>>>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>

Re: Python API - Weird Performance Issue

Posted by Chesnay Schepler <ch...@fu-berlin.de>.
only the coordination is done via UDP.

i agree with what you say about the loops; currently looking into using 
FileLocks.

On 9.9.2014 11:33, Stephan Ewen wrote:
> Hey!
>
> The UDP version is 25x slower? That's massive. Are you sending the records
> through that as well, or just the coordination?
>
> Regarding busy waiting loops: There has to be a better way to do that. It
> will behave utterly unpredictable. Once the python side does I/O, has a
> separate process or thread or goes asynchronously into a library
> (scikitlearn, numpy), the loop cannot be expected to stay at 5%.
>
> You have tested that with a job where both java and python side have some
> work to do. In case of a job where one side waits for the other, the
> waiting side will burn cycles like crazy. Then run it in parallel (#cores)
> and you may get executions where little more happens then the busy waiting
> loop burning cycles.
>
> Stephan
>
>
> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> sorry for the late answer.
>>
>> today i did a quick hack to replace the synchronization completely with
>> udp. its still synchronous and record based, but 25x slower.
>> regarding busy-loops i would propose the following:
>>
>> 1. leave the python side as it is. its doing most of the heavy lifting
>>     anyway and will run at 100% regardless of the loops. (the loops only
>>     take up 5% of the total runtime)
>> 2. once we exchange buffers instead of single records the IO operations
>>     and synchronization will take a fairly constant time. we could then
>>     put the java process to sleep manually for that time instead of
>>     waiting. it may not be as good as a blocking operation, but it
>>     should keep the cpu consumption down to some extent.
>>
>>
>> On 1.9.2014 22:50, Ufuk Celebi wrote:
>>
>>> Hey Chesnay,
>>>
>>> any progress on this today? Are you going for the UDP buffer availability
>>> notifications Stephan proposed instead of the busy loop?
>>>
>>> Ufuk
>>>
>>>
>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>>> chesnay.schepler@fu-berlin.de> wrote:
>>>
>>>   the performance differences occur on the same system (16GB, 4 cores +
>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>>> operator.
>>>> plenty of resources :/
>>>>
>>>>
>>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>>
>>>>   Hey Chesnay!
>>>>> Here are some thoughts:
>>>>>
>>>>>     - The repeated checking for 1 or 0 is indeed a busy loop. These may
>>>>> behave
>>>>> very different in different settings. If you run the code isolated, you
>>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>>> parallel
>>>>> instances in a larger framework, and it eats away CPU cycles from the
>>>>> threads that do the work - it starts hurting badly.
>>>>>
>>>>>     - You may get around a copy into the shared memory (ByteBuffer into
>>>>> MemoryMappedFile) by creating an according DataOutputView - save one
>>>>> more
>>>>> data copy. That's the next step, though, first solve the other issue.
>>>>>
>>>>> The last time I implemented such an inter-process data pipe between
>>>>> languages, I had a similar issue: No support for system wide semaphores
>>>>> (or
>>>>> something similar) on both sides.
>>>>>
>>>>> I used Shared memory for the buffers, and a local network socket (UDP,
>>>>> but
>>>>> I guess TCP would be fine as well) for notifications when buffers are
>>>>> available. That worked pretty well, yielded high throughput, because the
>>>>> big buffers were not copied (unlike in streams), and the UDP
>>>>> notifications
>>>>> were very fast (fire and forget datagrams).
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>
>>>>>    Hey Stephan,
>>>>>
>>>>>> I'd like to point out right away that the code related to your
>>>>>> questions
>>>>>> is shared by both programs.
>>>>>>
>>>>>> regarding your first point: i have a byte[] into which i serialize the
>>>>>> data first using a ByteBuffer, and then write that data to a
>>>>>> MappedByteBuffer.
>>>>>>
>>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>>> things
>>>>>> like semaphores or similar that work between python and java alike.
>>>>>>
>>>>>> the data exchange is currently completely synchronous. java writes a
>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>>>> whether
>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>>>> happens, it reads the record, sets the bit to 0 which tells java that
>>>>>> it
>>>>>> has read the record and can write the next one. this scheme works the
>>>>>> same
>>>>>> way the other way around.
>>>>>>
>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>>>> rather
>>>>>> should be...) way faster (5x) that what we had so far though
>>>>>> (asynchronous
>>>>>> pipes).
>>>>>> (i also tried different schemes that all had no effect, so i decided to
>>>>>> stick with the easiest one)
>>>>>>
>>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>>
>>>>>>    Hi Chesnay!
>>>>>>
>>>>>>> That is an interesting problem, though hard to judge with the
>>>>>>> information
>>>>>>> we have.
>>>>>>>
>>>>>>> Can you elaborate a bit on the following points:
>>>>>>>
>>>>>>>      - When putting the objects from the Java Flink side into the
>>>>>>> shared
>>>>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>>>>> then
>>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>>
>>>>>>>      - Shared memory access has to be somehow controlled. The pipes
>>>>>>> give
>>>>>>> you
>>>>>>> flow control for free (blocking write calls when the stream consumer
>>>>>>> is
>>>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>>>> semaphores,
>>>>>>> or, in java File(Range)Locks to coordinate access and block until
>>>>>>> memory
>>>>>>> regions are made available. Can you check if there are some busy
>>>>>>> waiting
>>>>>>> parts in you code?
>>>>>>>
>>>>>>>      - More general: The code is slower, but does it burn CPU cycles in
>>>>>>> its
>>>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>
>>>>>>>     Hello everyone,
>>>>>>>
>>>>>>>   This will be some kind of brainstorming question.
>>>>>>>> As some of you may know I am currently working on the Python API. The
>>>>>>>> most
>>>>>>>> crucial part here is how the data is exchanged between Java and
>>>>>>>> Python.
>>>>>>>> Up to this point we used pipes for this, but switched recently to
>>>>>>>> memory
>>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>>>
>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this
>>>>>>>> would
>>>>>>>> yield a significant increase. yet when i added the code to flink and
>>>>>>>> ran
>>>>>>>> a
>>>>>>>> job, there was
>>>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>>>> /exactly/
>>>>>>>> the same time.
>>>>>>>>
>>>>>>>> my conclusion was that code already in place (and not part of the
>>>>>>>> prototypes) is responsible for this.
>>>>>>>> so i went ahead and modified the prototypes to use all relevant code
>>>>>>>> from
>>>>>>>> the Python API in order to narrow down the culprit. but this time,
>>>>>>>> the
>>>>>>>> performance increase was there.
>>>>>>>>
>>>>>>>> Now here's the question: How can the /very same code/ perform so much
>>>>>>>> worse when integrated into flink? if the code is not the problem,
>>>>>>>> what
>>>>>>>> could be it?
>>>>>>>>
>>>>>>>> i spent a lot of time looking for that one line of code that cripples
>>>>>>>> the
>>>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>


Re: Python API - Weird Performance Issue

Posted by Stephan Ewen <se...@apache.org>.
Hey!

The UDP version is 25x slower? That's massive. Are you sending the records
through that as well, or just the coordination?

Regarding busy waiting loops: There has to be a better way to do that. It
will behave utterly unpredictable. Once the python side does I/O, has a
separate process or thread or goes asynchronously into a library
(scikitlearn, numpy), the loop cannot be expected to stay at 5%.

You have tested that with a job where both java and python side have some
work to do. In case of a job where one side waits for the other, the
waiting side will burn cycles like crazy. Then run it in parallel (#cores)
and you may get executions where little more happens then the busy waiting
loop burning cycles.

Stephan


On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> sorry for the late answer.
>
> today i did a quick hack to replace the synchronization completely with
> udp. its still synchronous and record based, but 25x slower.
> regarding busy-loops i would propose the following:
>
> 1. leave the python side as it is. its doing most of the heavy lifting
>    anyway and will run at 100% regardless of the loops. (the loops only
>    take up 5% of the total runtime)
> 2. once we exchange buffers instead of single records the IO operations
>    and synchronization will take a fairly constant time. we could then
>    put the java process to sleep manually for that time instead of
>    waiting. it may not be as good as a blocking operation, but it
>    should keep the cpu consumption down to some extent.
>
>
> On 1.9.2014 22:50, Ufuk Celebi wrote:
>
>> Hey Chesnay,
>>
>> any progress on this today? Are you going for the UDP buffer availability
>> notifications Stephan proposed instead of the busy loop?
>>
>> Ufuk
>>
>>
>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>
>>  the performance differences occur on the same system (16GB, 4 cores +
>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>> operator.
>>> plenty of resources :/
>>>
>>>
>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>
>>>  Hey Chesnay!
>>>>
>>>> Here are some thoughts:
>>>>
>>>>    - The repeated checking for 1 or 0 is indeed a busy loop. These may
>>>> behave
>>>> very different in different settings. If you run the code isolated, you
>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>> parallel
>>>> instances in a larger framework, and it eats away CPU cycles from the
>>>> threads that do the work - it starts hurting badly.
>>>>
>>>>    - You may get around a copy into the shared memory (ByteBuffer into
>>>> MemoryMappedFile) by creating an according DataOutputView - save one
>>>> more
>>>> data copy. That's the next step, though, first solve the other issue.
>>>>
>>>> The last time I implemented such an inter-process data pipe between
>>>> languages, I had a similar issue: No support for system wide semaphores
>>>> (or
>>>> something similar) on both sides.
>>>>
>>>> I used Shared memory for the buffers, and a local network socket (UDP,
>>>> but
>>>> I guess TCP would be fine as well) for notifications when buffers are
>>>> available. That worked pretty well, yielded high throughput, because the
>>>> big buffers were not copied (unlike in streams), and the UDP
>>>> notifications
>>>> were very fast (fire and forget datagrams).
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>
>>>>   Hey Stephan,
>>>>
>>>>> I'd like to point out right away that the code related to your
>>>>> questions
>>>>> is shared by both programs.
>>>>>
>>>>> regarding your first point: i have a byte[] into which i serialize the
>>>>> data first using a ByteBuffer, and then write that data to a
>>>>> MappedByteBuffer.
>>>>>
>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>> things
>>>>> like semaphores or similar that work between python and java alike.
>>>>>
>>>>> the data exchange is currently completely synchronous. java writes a
>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>>> whether
>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>>> happens, it reads the record, sets the bit to 0 which tells java that
>>>>> it
>>>>> has read the record and can write the next one. this scheme works the
>>>>> same
>>>>> way the other way around.
>>>>>
>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>>> rather
>>>>> should be...) way faster (5x) that what we had so far though
>>>>> (asynchronous
>>>>> pipes).
>>>>> (i also tried different schemes that all had no effect, so i decided to
>>>>> stick with the easiest one)
>>>>>
>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>
>>>>>   Hi Chesnay!
>>>>>
>>>>>> That is an interesting problem, though hard to judge with the
>>>>>> information
>>>>>> we have.
>>>>>>
>>>>>> Can you elaborate a bit on the following points:
>>>>>>
>>>>>>     - When putting the objects from the Java Flink side into the
>>>>>> shared
>>>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>>>> then
>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>
>>>>>>     - Shared memory access has to be somehow controlled. The pipes
>>>>>> give
>>>>>> you
>>>>>> flow control for free (blocking write calls when the stream consumer
>>>>>> is
>>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>>> semaphores,
>>>>>> or, in java File(Range)Locks to coordinate access and block until
>>>>>> memory
>>>>>> regions are made available. Can you check if there are some busy
>>>>>> waiting
>>>>>> parts in you code?
>>>>>>
>>>>>>     - More general: The code is slower, but does it burn CPU cycles in
>>>>>> its
>>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>
>>>>>>    Hello everyone,
>>>>>>
>>>>>>  This will be some kind of brainstorming question.
>>>>>>>
>>>>>>> As some of you may know I am currently working on the Python API. The
>>>>>>> most
>>>>>>> crucial part here is how the data is exchanged between Java and
>>>>>>> Python.
>>>>>>> Up to this point we used pipes for this, but switched recently to
>>>>>>> memory
>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>>
>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this
>>>>>>> would
>>>>>>> yield a significant increase. yet when i added the code to flink and
>>>>>>> ran
>>>>>>> a
>>>>>>> job, there was
>>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>>> /exactly/
>>>>>>> the same time.
>>>>>>>
>>>>>>> my conclusion was that code already in place (and not part of the
>>>>>>> prototypes) is responsible for this.
>>>>>>> so i went ahead and modified the prototypes to use all relevant code
>>>>>>> from
>>>>>>> the Python API in order to narrow down the culprit. but this time,
>>>>>>> the
>>>>>>> performance increase was there.
>>>>>>>
>>>>>>> Now here's the question: How can the /very same code/ perform so much
>>>>>>> worse when integrated into flink? if the code is not the problem,
>>>>>>> what
>>>>>>> could be it?
>>>>>>>
>>>>>>> i spent a lot of time looking for that one line of code that cripples
>>>>>>> the
>>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>

Re: Python API - Weird Performance Issue

Posted by Chesnay Schepler <ch...@fu-berlin.de>.
sorry for the late answer.

today i did a quick hack to replace the synchronization completely with 
udp. its still synchronous and record based, but 25x slower.
regarding busy-loops i would propose the following:

 1. leave the python side as it is. its doing most of the heavy lifting
    anyway and will run at 100% regardless of the loops. (the loops only
    take up 5% of the total runtime)
 2. once we exchange buffers instead of single records the IO operations
    and synchronization will take a fairly constant time. we could then
    put the java process to sleep manually for that time instead of
    waiting. it may not be as good as a blocking operation, but it
    should keep the cpu consumption down to some extent.

On 1.9.2014 22:50, Ufuk Celebi wrote:
> Hey Chesnay,
>
> any progress on this today? Are you going for the UDP buffer availability
> notifications Stephan proposed instead of the busy loop?
>
> Ufuk
>
>
> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> the performance differences occur on the same system (16GB, 4 cores +
>> HyperThreading) with a DOP of 1 for a plan consisting of a single operator.
>> plenty of resources :/
>>
>>
>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>
>>> Hey Chesnay!
>>>
>>> Here are some thoughts:
>>>
>>>    - The repeated checking for 1 or 0 is indeed a busy loop. These may
>>> behave
>>> very different in different settings. If you run the code isolated, you
>>> have a spare core for the thread and it barely hurts. Run multiple
>>> parallel
>>> instances in a larger framework, and it eats away CPU cycles from the
>>> threads that do the work - it starts hurting badly.
>>>
>>>    - You may get around a copy into the shared memory (ByteBuffer into
>>> MemoryMappedFile) by creating an according DataOutputView - save one more
>>> data copy. That's the next step, though, first solve the other issue.
>>>
>>> The last time I implemented such an inter-process data pipe between
>>> languages, I had a similar issue: No support for system wide semaphores
>>> (or
>>> something similar) on both sides.
>>>
>>> I used Shared memory for the buffers, and a local network socket (UDP, but
>>> I guess TCP would be fine as well) for notifications when buffers are
>>> available. That worked pretty well, yielded high throughput, because the
>>> big buffers were not copied (unlike in streams), and the UDP notifications
>>> were very fast (fire and forget datagrams).
>>>
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>> chesnay.schepler@fu-berlin.de> wrote:
>>>
>>>   Hey Stephan,
>>>> I'd like to point out right away that the code related to your questions
>>>> is shared by both programs.
>>>>
>>>> regarding your first point: i have a byte[] into which i serialize the
>>>> data first using a ByteBuffer, and then write that data to a
>>>> MappedByteBuffer.
>>>>
>>>> regarding synchronization: i couldn't find a way to use elaborate things
>>>> like semaphores or similar that work between python and java alike.
>>>>
>>>> the data exchange is currently completely synchronous. java writes a
>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>> whether
>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>> happens, it reads the record, sets the bit to 0 which tells java that it
>>>> has read the record and can write the next one. this scheme works the
>>>> same
>>>> way the other way around.
>>>>
>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>> rather
>>>> should be...) way faster (5x) that what we had so far though
>>>> (asynchronous
>>>> pipes).
>>>> (i also tried different schemes that all had no effect, so i decided to
>>>> stick with the easiest one)
>>>>
>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>
>>>>
>>>>
>>>>
>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>
>>>>   Hi Chesnay!
>>>>> That is an interesting problem, though hard to judge with the
>>>>> information
>>>>> we have.
>>>>>
>>>>> Can you elaborate a bit on the following points:
>>>>>
>>>>>     - When putting the objects from the Java Flink side into the shared
>>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>>> then
>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>
>>>>>     - Shared memory access has to be somehow controlled. The pipes give
>>>>> you
>>>>> flow control for free (blocking write calls when the stream consumer is
>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>> semaphores,
>>>>> or, in java File(Range)Locks to coordinate access and block until memory
>>>>> regions are made available. Can you check if there are some busy waiting
>>>>> parts in you code?
>>>>>
>>>>>     - More general: The code is slower, but does it burn CPU cycles in
>>>>> its
>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>
>>>>>    Hello everyone,
>>>>>
>>>>>> This will be some kind of brainstorming question.
>>>>>>
>>>>>> As some of you may know I am currently working on the Python API. The
>>>>>> most
>>>>>> crucial part here is how the data is exchanged between Java and Python.
>>>>>> Up to this point we used pipes for this, but switched recently to
>>>>>> memory
>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>
>>>>>> Early (simplified) prototypes (outside of Flink) showed that this would
>>>>>> yield a significant increase. yet when i added the code to flink and
>>>>>> ran
>>>>>> a
>>>>>> job, there was
>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>> /exactly/
>>>>>> the same time.
>>>>>>
>>>>>> my conclusion was that code already in place (and not part of the
>>>>>> prototypes) is responsible for this.
>>>>>> so i went ahead and modified the prototypes to use all relevant code
>>>>>> from
>>>>>> the Python API in order to narrow down the culprit. but this time, the
>>>>>> performance increase was there.
>>>>>>
>>>>>> Now here's the question: How can the /very same code/ perform so much
>>>>>> worse when integrated into flink? if the code is not the problem, what
>>>>>> could be it?
>>>>>>
>>>>>> i spent a lot of time looking for that one line of code that cripples
>>>>>> the
>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>
>>>>>>
>>>>>>
>>>>>>


Re: Python API - Weird Performance Issue

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Chesnay,

any progress on this today? Are you going for the UDP buffer availability
notifications Stephan proposed instead of the busy loop?

Ufuk


On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> the performance differences occur on the same system (16GB, 4 cores +
> HyperThreading) with a DOP of 1 for a plan consisting of a single operator.
> plenty of resources :/
>
>
> On 28.8.2014 0:50, Stephan Ewen wrote:
>
>> Hey Chesnay!
>>
>> Here are some thoughts:
>>
>>   - The repeated checking for 1 or 0 is indeed a busy loop. These may
>> behave
>> very different in different settings. If you run the code isolated, you
>> have a spare core for the thread and it barely hurts. Run multiple
>> parallel
>> instances in a larger framework, and it eats away CPU cycles from the
>> threads that do the work - it starts hurting badly.
>>
>>   - You may get around a copy into the shared memory (ByteBuffer into
>> MemoryMappedFile) by creating an according DataOutputView - save one more
>> data copy. That's the next step, though, first solve the other issue.
>>
>> The last time I implemented such an inter-process data pipe between
>> languages, I had a similar issue: No support for system wide semaphores
>> (or
>> something similar) on both sides.
>>
>> I used Shared memory for the buffers, and a local network socket (UDP, but
>> I guess TCP would be fine as well) for notifications when buffers are
>> available. That worked pretty well, yielded high throughput, because the
>> big buffers were not copied (unlike in streams), and the UDP notifications
>> were very fast (fire and forget datagrams).
>>
>> Stephan
>>
>>
>>
>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>
>>  Hey Stephan,
>>>
>>> I'd like to point out right away that the code related to your questions
>>> is shared by both programs.
>>>
>>> regarding your first point: i have a byte[] into which i serialize the
>>> data first using a ByteBuffer, and then write that data to a
>>> MappedByteBuffer.
>>>
>>> regarding synchronization: i couldn't find a way to use elaborate things
>>> like semaphores or similar that work between python and java alike.
>>>
>>> the data exchange is currently completely synchronous. java writes a
>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>> whether
>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>> happens, it reads the record, sets the bit to 0 which tells java that it
>>> has read the record and can write the next one. this scheme works the
>>> same
>>> way the other way around.
>>>
>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>> rather
>>> should be...) way faster (5x) that what we had so far though
>>> (asynchronous
>>> pipes).
>>> (i also tried different schemes that all had no effect, so i decided to
>>> stick with the easiest one)
>>>
>>> on to your last point: I'm gonna check for that tomorrow.
>>>
>>>
>>>
>>>
>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>
>>>  Hi Chesnay!
>>>>
>>>> That is an interesting problem, though hard to judge with the
>>>> information
>>>> we have.
>>>>
>>>> Can you elaborate a bit on the following points:
>>>>
>>>>    - When putting the objects from the Java Flink side into the shared
>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>> then
>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>
>>>>    - Shared memory access has to be somehow controlled. The pipes give
>>>> you
>>>> flow control for free (blocking write calls when the stream consumer is
>>>> busy). What do you do for the shared memory? Usually, one uses
>>>> semaphores,
>>>> or, in java File(Range)Locks to coordinate access and block until memory
>>>> regions are made available. Can you check if there are some busy waiting
>>>> parts in you code?
>>>>
>>>>    - More general: The code is slower, but does it burn CPU cycles in
>>>> its
>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>
>>>>   Hello everyone,
>>>>
>>>>> This will be some kind of brainstorming question.
>>>>>
>>>>> As some of you may know I am currently working on the Python API. The
>>>>> most
>>>>> crucial part here is how the data is exchanged between Java and Python.
>>>>> Up to this point we used pipes for this, but switched recently to
>>>>> memory
>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>
>>>>> Early (simplified) prototypes (outside of Flink) showed that this would
>>>>> yield a significant increase. yet when i added the code to flink and
>>>>> ran
>>>>> a
>>>>> job, there was
>>>>> no effect. like at all. two radically different schemes ran in
>>>>> /exactly/
>>>>> the same time.
>>>>>
>>>>> my conclusion was that code already in place (and not part of the
>>>>> prototypes) is responsible for this.
>>>>> so i went ahead and modified the prototypes to use all relevant code
>>>>> from
>>>>> the Python API in order to narrow down the culprit. but this time, the
>>>>> performance increase was there.
>>>>>
>>>>> Now here's the question: How can the /very same code/ perform so much
>>>>> worse when integrated into flink? if the code is not the problem, what
>>>>> could be it?
>>>>>
>>>>> i spent a lot of time looking for that one line of code that cripples
>>>>> the
>>>>> performance, but I'm pretty much out of places to look.
>>>>>
>>>>>
>>>>>
>>>>>
>

Re: Python API - Weird Performance Issue

Posted by Chesnay Schepler <ch...@fu-berlin.de>.
the performance differences occur on the same system (16GB, 4 cores + 
HyperThreading) with a DOP of 1 for a plan consisting of a single 
operator. plenty of resources :/

On 28.8.2014 0:50, Stephan Ewen wrote:
> Hey Chesnay!
>
> Here are some thoughts:
>
>   - The repeated checking for 1 or 0 is indeed a busy loop. These may behave
> very different in different settings. If you run the code isolated, you
> have a spare core for the thread and it barely hurts. Run multiple parallel
> instances in a larger framework, and it eats away CPU cycles from the
> threads that do the work - it starts hurting badly.
>
>   - You may get around a copy into the shared memory (ByteBuffer into
> MemoryMappedFile) by creating an according DataOutputView - save one more
> data copy. That's the next step, though, first solve the other issue.
>
> The last time I implemented such an inter-process data pipe between
> languages, I had a similar issue: No support for system wide semaphores (or
> something similar) on both sides.
>
> I used Shared memory for the buffers, and a local network socket (UDP, but
> I guess TCP would be fine as well) for notifications when buffers are
> available. That worked pretty well, yielded high throughput, because the
> big buffers were not copied (unlike in streams), and the UDP notifications
> were very fast (fire and forget datagrams).
>
> Stephan
>
>
>
> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> Hey Stephan,
>>
>> I'd like to point out right away that the code related to your questions
>> is shared by both programs.
>>
>> regarding your first point: i have a byte[] into which i serialize the
>> data first using a ByteBuffer, and then write that data to a
>> MappedByteBuffer.
>>
>> regarding synchronization: i couldn't find a way to use elaborate things
>> like semaphores or similar that work between python and java alike.
>>
>> the data exchange is currently completely synchronous. java writes a
>> record, sets an "isWritten" bit and then repeatedly checks this bit whether
>> it is 0. python repeatedly checks this bit whether it is 1. once that
>> happens, it reads the record, sets the bit to 0 which tells java that it
>> has read the record and can write the next one. this scheme works the same
>> way the other way around.
>>
>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or rather
>> should be...) way faster (5x) that what we had so far though (asynchronous
>> pipes).
>> (i also tried different schemes that all had no effect, so i decided to
>> stick with the easiest one)
>>
>> on to your last point: I'm gonna check for that tomorrow.
>>
>>
>>
>>
>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>
>>> Hi Chesnay!
>>>
>>> That is an interesting problem, though hard to judge with the information
>>> we have.
>>>
>>> Can you elaborate a bit on the following points:
>>>
>>>    - When putting the objects from the Java Flink side into the shared
>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>> then
>>> copy that into the shared memory ByteBuffer? Directly?
>>>
>>>    - Shared memory access has to be somehow controlled. The pipes give you
>>> flow control for free (blocking write calls when the stream consumer is
>>> busy). What do you do for the shared memory? Usually, one uses semaphores,
>>> or, in java File(Range)Locks to coordinate access and block until memory
>>> regions are made available. Can you check if there are some busy waiting
>>> parts in you code?
>>>
>>>    - More general: The code is slower, but does it burn CPU cycles in its
>>> slowness or is it waiting for locks / monitors / conditions ?
>>>
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>> chesnay.schepler@fu-berlin.de> wrote:
>>>
>>>   Hello everyone,
>>>> This will be some kind of brainstorming question.
>>>>
>>>> As some of you may know I am currently working on the Python API. The
>>>> most
>>>> crucial part here is how the data is exchanged between Java and Python.
>>>> Up to this point we used pipes for this, but switched recently to memory
>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>
>>>> Early (simplified) prototypes (outside of Flink) showed that this would
>>>> yield a significant increase. yet when i added the code to flink and ran
>>>> a
>>>> job, there was
>>>> no effect. like at all. two radically different schemes ran in /exactly/
>>>> the same time.
>>>>
>>>> my conclusion was that code already in place (and not part of the
>>>> prototypes) is responsible for this.
>>>> so i went ahead and modified the prototypes to use all relevant code from
>>>> the Python API in order to narrow down the culprit. but this time, the
>>>> performance increase was there.
>>>>
>>>> Now here's the question: How can the /very same code/ perform so much
>>>> worse when integrated into flink? if the code is not the problem, what
>>>> could be it?
>>>>
>>>> i spent a lot of time looking for that one line of code that cripples the
>>>> performance, but I'm pretty much out of places to look.
>>>>
>>>>
>>>>


Re: Python API - Weird Performance Issue

Posted by Stephan Ewen <se...@apache.org>.
Hey Chesnay!

Here are some thoughts:

 - The repeated checking for 1 or 0 is indeed a busy loop. These may behave
very different in different settings. If you run the code isolated, you
have a spare core for the thread and it barely hurts. Run multiple parallel
instances in a larger framework, and it eats away CPU cycles from the
threads that do the work - it starts hurting badly.

 - You may get around a copy into the shared memory (ByteBuffer into
MemoryMappedFile) by creating an according DataOutputView - save one more
data copy. That's the next step, though, first solve the other issue.

The last time I implemented such an inter-process data pipe between
languages, I had a similar issue: No support for system wide semaphores (or
something similar) on both sides.

I used Shared memory for the buffers, and a local network socket (UDP, but
I guess TCP would be fine as well) for notifications when buffers are
available. That worked pretty well, yielded high throughput, because the
big buffers were not copied (unlike in streams), and the UDP notifications
were very fast (fire and forget datagrams).

Stephan



On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> Hey Stephan,
>
> I'd like to point out right away that the code related to your questions
> is shared by both programs.
>
> regarding your first point: i have a byte[] into which i serialize the
> data first using a ByteBuffer, and then write that data to a
> MappedByteBuffer.
>
> regarding synchronization: i couldn't find a way to use elaborate things
> like semaphores or similar that work between python and java alike.
>
> the data exchange is currently completely synchronous. java writes a
> record, sets an "isWritten" bit and then repeatedly checks this bit whether
> it is 0. python repeatedly checks this bit whether it is 1. once that
> happens, it reads the record, sets the bit to 0 which tells java that it
> has read the record and can write the next one. this scheme works the same
> way the other way around.
>
> *NOW,* this may seem ... inefficient, to put it slightly. it is (or rather
> should be...) way faster (5x) that what we had so far though (asynchronous
> pipes).
> (i also tried different schemes that all had no effect, so i decided to
> stick with the easiest one)
>
> on to your last point: I'm gonna check for that tomorrow.
>
>
>
>
> On 27.8.2014 20:45, Stephan Ewen wrote:
>
>> Hi Chesnay!
>>
>> That is an interesting problem, though hard to judge with the information
>> we have.
>>
>> Can you elaborate a bit on the following points:
>>
>>   - When putting the objects from the Java Flink side into the shared
>> memory, you need to serialize them. How do you do that? Into a buffer,
>> then
>> copy that into the shared memory ByteBuffer? Directly?
>>
>>   - Shared memory access has to be somehow controlled. The pipes give you
>> flow control for free (blocking write calls when the stream consumer is
>> busy). What do you do for the shared memory? Usually, one uses semaphores,
>> or, in java File(Range)Locks to coordinate access and block until memory
>> regions are made available. Can you check if there are some busy waiting
>> parts in you code?
>>
>>   - More general: The code is slower, but does it burn CPU cycles in its
>> slowness or is it waiting for locks / monitors / conditions ?
>>
>> Stephan
>>
>>
>>
>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>
>>  Hello everyone,
>>>
>>> This will be some kind of brainstorming question.
>>>
>>> As some of you may know I am currently working on the Python API. The
>>> most
>>> crucial part here is how the data is exchanged between Java and Python.
>>> Up to this point we used pipes for this, but switched recently to memory
>>> mapped files in hopes of increasing the (lacking) performance.
>>>
>>> Early (simplified) prototypes (outside of Flink) showed that this would
>>> yield a significant increase. yet when i added the code to flink and ran
>>> a
>>> job, there was
>>> no effect. like at all. two radically different schemes ran in /exactly/
>>> the same time.
>>>
>>> my conclusion was that code already in place (and not part of the
>>> prototypes) is responsible for this.
>>> so i went ahead and modified the prototypes to use all relevant code from
>>> the Python API in order to narrow down the culprit. but this time, the
>>> performance increase was there.
>>>
>>> Now here's the question: How can the /very same code/ perform so much
>>> worse when integrated into flink? if the code is not the problem, what
>>> could be it?
>>>
>>> i spent a lot of time looking for that one line of code that cripples the
>>> performance, but I'm pretty much out of places to look.
>>>
>>>
>>>
>

Re: Python API - Weird Performance Issue

Posted by Chesnay Schepler <ch...@fu-berlin.de>.
Hey Stephan,

I'd like to point out right away that the code related to your questions 
is shared by both programs.

regarding your first point: i have a byte[] into which i serialize the 
data first using a ByteBuffer, and then write that data to a 
MappedByteBuffer.

regarding synchronization: i couldn't find a way to use elaborate things 
like semaphores or similar that work between python and java alike.

the data exchange is currently completely synchronous. java writes a 
record, sets an "isWritten" bit and then repeatedly checks this bit 
whether it is 0. python repeatedly checks this bit whether it is 1. once 
that happens, it reads the record, sets the bit to 0 which tells java 
that it has read the record and can write the next one. this scheme 
works the same way the other way around.

*NOW,* this may seem ... inefficient, to put it slightly. it is (or 
rather should be...) way faster (5x) that what we had so far though 
(asynchronous pipes).
(i also tried different schemes that all had no effect, so i decided to 
stick with the easiest one)

on to your last point: I'm gonna check for that tomorrow.



On 27.8.2014 20:45, Stephan Ewen wrote:
> Hi Chesnay!
>
> That is an interesting problem, though hard to judge with the information
> we have.
>
> Can you elaborate a bit on the following points:
>
>   - When putting the objects from the Java Flink side into the shared
> memory, you need to serialize them. How do you do that? Into a buffer, then
> copy that into the shared memory ByteBuffer? Directly?
>
>   - Shared memory access has to be somehow controlled. The pipes give you
> flow control for free (blocking write calls when the stream consumer is
> busy). What do you do for the shared memory? Usually, one uses semaphores,
> or, in java File(Range)Locks to coordinate access and block until memory
> regions are made available. Can you check if there are some busy waiting
> parts in you code?
>
>   - More general: The code is slower, but does it burn CPU cycles in its
> slowness or is it waiting for locks / monitors / conditions ?
>
> Stephan
>
>
>
> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> Hello everyone,
>>
>> This will be some kind of brainstorming question.
>>
>> As some of you may know I am currently working on the Python API. The most
>> crucial part here is how the data is exchanged between Java and Python.
>> Up to this point we used pipes for this, but switched recently to memory
>> mapped files in hopes of increasing the (lacking) performance.
>>
>> Early (simplified) prototypes (outside of Flink) showed that this would
>> yield a significant increase. yet when i added the code to flink and ran a
>> job, there was
>> no effect. like at all. two radically different schemes ran in /exactly/
>> the same time.
>>
>> my conclusion was that code already in place (and not part of the
>> prototypes) is responsible for this.
>> so i went ahead and modified the prototypes to use all relevant code from
>> the Python API in order to narrow down the culprit. but this time, the
>> performance increase was there.
>>
>> Now here's the question: How can the /very same code/ perform so much
>> worse when integrated into flink? if the code is not the problem, what
>> could be it?
>>
>> i spent a lot of time looking for that one line of code that cripples the
>> performance, but I'm pretty much out of places to look.
>>
>>


Re: Python API - Weird Performance Issue

Posted by Stephan Ewen <se...@apache.org>.
Hi Chesnay!

That is an interesting problem, though hard to judge with the information
we have.

Can you elaborate a bit on the following points:

 - When putting the objects from the Java Flink side into the shared
memory, you need to serialize them. How do you do that? Into a buffer, then
copy that into the shared memory ByteBuffer? Directly?

 - Shared memory access has to be somehow controlled. The pipes give you
flow control for free (blocking write calls when the stream consumer is
busy). What do you do for the shared memory? Usually, one uses semaphores,
or, in java File(Range)Locks to coordinate access and block until memory
regions are made available. Can you check if there are some busy waiting
parts in you code?

 - More general: The code is slower, but does it burn CPU cycles in its
slowness or is it waiting for locks / monitors / conditions ?

Stephan



On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> Hello everyone,
>
> This will be some kind of brainstorming question.
>
> As some of you may know I am currently working on the Python API. The most
> crucial part here is how the data is exchanged between Java and Python.
> Up to this point we used pipes for this, but switched recently to memory
> mapped files in hopes of increasing the (lacking) performance.
>
> Early (simplified) prototypes (outside of Flink) showed that this would
> yield a significant increase. yet when i added the code to flink and ran a
> job, there was
> no effect. like at all. two radically different schemes ran in /exactly/
> the same time.
>
> my conclusion was that code already in place (and not part of the
> prototypes) is responsible for this.
> so i went ahead and modified the prototypes to use all relevant code from
> the Python API in order to narrow down the culprit. but this time, the
> performance increase was there.
>
> Now here's the question: How can the /very same code/ perform so much
> worse when integrated into flink? if the code is not the problem, what
> could be it?
>
> i spent a lot of time looking for that one line of code that cripples the
> performance, but I'm pretty much out of places to look.
>
>